Skip to main content

gestalt/
runtime_log_host.rs

1use std::time::SystemTime;
2
3use hyper_util::rt::TokioIo;
4use tokio::net::UnixStream;
5use tonic::Request;
6use tonic::metadata::MetadataValue;
7use tonic::service::Interceptor;
8use tonic::service::interceptor::InterceptedService;
9use tonic::transport::{Channel, ClientTlsConfig, Endpoint, Uri};
10use tower::service_fn;
11
12use crate::env::{ENV_HOST_SERVICE_SOCKET, ENV_HOST_SERVICE_TOKEN};
13use crate::generated::v1::{
14    self as pb, runtime_log_host_client::RuntimeLogHostClient as ProtoRuntimeLogHostClient,
15};
16
17type RuntimeLogHostTransport = InterceptedService<Channel, RelayTokenInterceptor>;
18
19/// Environment variable containing the current runtime session id.
20pub const ENV_RUNTIME_SESSION_ID: &str = "GESTALT_RUNTIME_SESSION_ID";
21const RUNTIME_LOG_RELAY_TOKEN_HEADER: &str = "x-gestalt-host-service-relay-token";
22
23#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
24#[repr(i32)]
25/// Runtime log stream for runtime log entries.
26pub enum RuntimeLogStream {
27    #[default]
28    /// The `Unspecified` variant.
29    Unspecified = 0,
30    /// The `Stdout` variant.
31    Stdout = 1,
32    /// The `Stderr` variant.
33    Stderr = 2,
34    /// The `Runtime` variant.
35    Runtime = 3,
36}
37
38impl RuntimeLogStream {
39    const fn as_i32(self) -> i32 {
40        self as i32
41    }
42}
43
44#[derive(Clone, Debug, PartialEq, Eq)]
45/// One runtime log entry.
46pub struct RuntimeLogEntry {
47    /// The `stream` field.
48    pub stream: RuntimeLogStream,
49    /// The `message` field.
50    pub message: String,
51    /// The `observed_at` field.
52    pub observed_at: SystemTime,
53    /// The `source_seq` field.
54    pub source_seq: i64,
55}
56
57#[derive(Clone, Debug, Default, PartialEq, Eq)]
58/// Request for appending runtime logs.
59pub struct AppendRuntimeLogsRequest {
60    /// The `session_id` field.
61    pub session_id: String,
62    /// The `logs` field.
63    pub logs: Vec<RuntimeLogEntry>,
64}
65
66#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
67/// Response returned after appending runtime logs.
68pub struct AppendRuntimeLogsResponse {
69    /// The `last_seq` field.
70    pub last_seq: i64,
71}
72
73fn append_logs_request_to_proto(request: AppendRuntimeLogsRequest) -> pb::AppendRuntimeLogsRequest {
74    pb::AppendRuntimeLogsRequest {
75        session_id: request.session_id,
76        logs: request
77            .logs
78            .into_iter()
79            .map(|entry| pb::RuntimeLogEntry {
80                stream: entry.stream.as_i32(),
81                message: entry.message,
82                observed_at: Some(crate::protocol::timestamp_from_system_time(
83                    entry.observed_at,
84                )),
85                source_seq: entry.source_seq,
86            })
87            .collect(),
88    }
89}
90
91fn append_logs_response_from_proto(
92    response: pb::AppendRuntimeLogsResponse,
93) -> AppendRuntimeLogsResponse {
94    AppendRuntimeLogsResponse {
95        last_seq: response.last_seq,
96    }
97}
98
99#[derive(Debug, thiserror::Error)]
100/// Errors returned by [`RuntimeLogHost`].
101pub enum RuntimeLogHostError {
102    /// The host-service transport could not be created.
103    #[error("{0}")]
104    Transport(#[from] tonic::transport::Error),
105    /// The host-service RPC returned a gRPC status.
106    #[error("{0}")]
107    Status(#[from] tonic::Status),
108    /// Required environment or target configuration was invalid.
109    #[error("{0}")]
110    Env(String),
111}
112
113/// Client for appending runtime logs to the host.
114pub struct RuntimeLogHost {
115    client: ProtoRuntimeLogHostClient<RuntimeLogHostTransport>,
116    source_seq: i64,
117}
118
119impl RuntimeLogHost {
120    /// Connects to the runtime-log host service described by the environment.
121    pub async fn connect() -> std::result::Result<Self, RuntimeLogHostError> {
122        let socket_path = std::env::var(ENV_HOST_SERVICE_SOCKET).map_err(|_| {
123            RuntimeLogHostError::Env(format!("{ENV_HOST_SERVICE_SOCKET} is not set"))
124        })?;
125        let relay_token = std::env::var(ENV_HOST_SERVICE_TOKEN).unwrap_or_default();
126        let channel = match parse_runtime_log_host_target(&socket_path)? {
127            RuntimeLogHostTarget::Unix(path) => {
128                Endpoint::try_from("http://[::]:50051")?
129                    .connect_with_connector(service_fn(move |_: Uri| {
130                        let path = path.clone();
131                        async move { UnixStream::connect(path).await.map(TokioIo::new) }
132                    }))
133                    .await?
134            }
135            RuntimeLogHostTarget::Tcp(address) => {
136                Endpoint::from_shared(format!("http://{address}"))?
137                    .connect()
138                    .await?
139            }
140            RuntimeLogHostTarget::Tls(address) => {
141                Endpoint::from_shared(format!("https://{address}"))?
142                    .tls_config(ClientTlsConfig::new().with_native_roots())?
143                    .connect()
144                    .await?
145            }
146        };
147
148        Ok(Self {
149            client: ProtoRuntimeLogHostClient::with_interceptor(
150                channel,
151                relay_token_interceptor(relay_token.trim())?,
152            ),
153            source_seq: 0,
154        })
155    }
156
157    /// Appends logs using a raw protocol request message.
158    pub async fn append_logs(
159        &mut self,
160        request: AppendRuntimeLogsRequest,
161    ) -> std::result::Result<AppendRuntimeLogsResponse, RuntimeLogHostError> {
162        Ok(append_logs_response_from_proto(
163            self.client
164                .append_logs(append_logs_request_to_proto(request))
165                .await?
166                .into_inner(),
167        ))
168    }
169
170    /// Appends one log entry for an explicit session id.
171    pub async fn append(
172        &mut self,
173        session_id: impl Into<String>,
174        stream: RuntimeLogStream,
175        message: impl Into<String>,
176    ) -> std::result::Result<AppendRuntimeLogsResponse, RuntimeLogHostError> {
177        self.source_seq += 1;
178        let source_seq = self.source_seq;
179        self.append_entry(session_id, stream, message, None, source_seq)
180            .await
181    }
182
183    /// Appends one log entry for `GESTALT_RUNTIME_SESSION_ID`.
184    pub async fn append_current(
185        &mut self,
186        stream: RuntimeLogStream,
187        message: impl Into<String>,
188    ) -> std::result::Result<AppendRuntimeLogsResponse, RuntimeLogHostError> {
189        self.append(runtime_session_id()?, stream, message).await
190    }
191
192    /// Appends one log entry with an explicit timestamp and source sequence.
193    pub async fn append_entry(
194        &mut self,
195        session_id: impl Into<String>,
196        stream: RuntimeLogStream,
197        message: impl Into<String>,
198        observed_at: Option<SystemTime>,
199        source_seq: i64,
200    ) -> std::result::Result<AppendRuntimeLogsResponse, RuntimeLogHostError> {
201        self.source_seq = self.source_seq.max(source_seq);
202        self.append_logs(AppendRuntimeLogsRequest {
203            session_id: session_id.into(),
204            logs: vec![RuntimeLogEntry {
205                stream,
206                message: message.into(),
207                observed_at: observed_at.unwrap_or_else(SystemTime::now),
208                source_seq,
209            }],
210        })
211        .await
212    }
213
214    /// Appends one explicit log entry for `GESTALT_RUNTIME_SESSION_ID`.
215    pub async fn append_current_entry(
216        &mut self,
217        stream: RuntimeLogStream,
218        message: impl Into<String>,
219        observed_at: Option<SystemTime>,
220        source_seq: i64,
221    ) -> std::result::Result<AppendRuntimeLogsResponse, RuntimeLogHostError> {
222        self.append_entry(
223            runtime_session_id()?,
224            stream,
225            message,
226            observed_at,
227            source_seq,
228        )
229        .await
230    }
231
232    /// Appends one stdout log entry for an explicit session id.
233    pub async fn append_stdout(
234        &mut self,
235        session_id: impl Into<String>,
236        message: impl Into<String>,
237    ) -> std::result::Result<AppendRuntimeLogsResponse, RuntimeLogHostError> {
238        self.append(session_id, RuntimeLogStream::Stdout, message)
239            .await
240    }
241
242    /// Appends one stdout log entry for `GESTALT_RUNTIME_SESSION_ID`.
243    pub async fn append_current_stdout(
244        &mut self,
245        message: impl Into<String>,
246    ) -> std::result::Result<AppendRuntimeLogsResponse, RuntimeLogHostError> {
247        self.append_current(RuntimeLogStream::Stdout, message).await
248    }
249
250    /// Appends one stderr log entry for an explicit session id.
251    pub async fn append_stderr(
252        &mut self,
253        session_id: impl Into<String>,
254        message: impl Into<String>,
255    ) -> std::result::Result<AppendRuntimeLogsResponse, RuntimeLogHostError> {
256        self.append(session_id, RuntimeLogStream::Stderr, message)
257            .await
258    }
259
260    /// Appends one stderr log entry for `GESTALT_RUNTIME_SESSION_ID`.
261    pub async fn append_current_stderr(
262        &mut self,
263        message: impl Into<String>,
264    ) -> std::result::Result<AppendRuntimeLogsResponse, RuntimeLogHostError> {
265        self.append_current(RuntimeLogStream::Stderr, message).await
266    }
267
268    /// Appends one runtime log entry for an explicit session id.
269    pub async fn append_runtime(
270        &mut self,
271        session_id: impl Into<String>,
272        message: impl Into<String>,
273    ) -> std::result::Result<AppendRuntimeLogsResponse, RuntimeLogHostError> {
274        self.append(session_id, RuntimeLogStream::Runtime, message)
275            .await
276    }
277
278    /// Appends one runtime log entry for `GESTALT_RUNTIME_SESSION_ID`.
279    pub async fn append_current_runtime(
280        &mut self,
281        message: impl Into<String>,
282    ) -> std::result::Result<AppendRuntimeLogsResponse, RuntimeLogHostError> {
283        self.append_current(RuntimeLogStream::Runtime, message)
284            .await
285    }
286}
287
288#[derive(Clone)]
289struct RelayTokenInterceptor {
290    token: Option<MetadataValue<tonic::metadata::Ascii>>,
291}
292
293impl Interceptor for RelayTokenInterceptor {
294    fn call(
295        &mut self,
296        mut request: Request<()>,
297    ) -> std::result::Result<Request<()>, tonic::Status> {
298        if let Some(token) = self.token.clone() {
299            request
300                .metadata_mut()
301                .insert(RUNTIME_LOG_RELAY_TOKEN_HEADER, token);
302        }
303        Ok(request)
304    }
305}
306
307fn relay_token_interceptor(
308    token: &str,
309) -> std::result::Result<RelayTokenInterceptor, RuntimeLogHostError> {
310    let trimmed = token.trim();
311    let token = if trimmed.is_empty() {
312        None
313    } else {
314        Some(MetadataValue::try_from(trimmed).map_err(|err| {
315            RuntimeLogHostError::Env(format!(
316                "runtime log host: invalid relay token metadata: {err}"
317            ))
318        })?)
319    };
320    Ok(RelayTokenInterceptor { token })
321}
322
323enum RuntimeLogHostTarget {
324    Unix(String),
325    Tcp(String),
326    Tls(String),
327}
328
329fn parse_runtime_log_host_target(
330    raw: &str,
331) -> std::result::Result<RuntimeLogHostTarget, RuntimeLogHostError> {
332    let target = raw.trim();
333    if target.is_empty() {
334        return Err(RuntimeLogHostError::Env(
335            "runtime log host: transport target is required".to_string(),
336        ));
337    }
338    if let Some(address) = target.strip_prefix("tcp://") {
339        let address = address.trim();
340        if address.is_empty() {
341            return Err(RuntimeLogHostError::Env(format!(
342                "runtime log host: tcp target {raw:?} is missing host:port"
343            )));
344        }
345        return Ok(RuntimeLogHostTarget::Tcp(address.to_string()));
346    }
347    if let Some(address) = target.strip_prefix("tls://") {
348        let address = address.trim();
349        if address.is_empty() {
350            return Err(RuntimeLogHostError::Env(format!(
351                "runtime log host: tls target {raw:?} is missing host:port"
352            )));
353        }
354        return Ok(RuntimeLogHostTarget::Tls(address.to_string()));
355    }
356    if let Some(path) = target.strip_prefix("unix://") {
357        let path = path.trim();
358        if path.is_empty() {
359            return Err(RuntimeLogHostError::Env(format!(
360                "runtime log host: unix target {raw:?} is missing a socket path"
361            )));
362        }
363        return Ok(RuntimeLogHostTarget::Unix(path.to_string()));
364    }
365    if target.contains("://") {
366        return Err(RuntimeLogHostError::Env(format!(
367            "runtime log host: unsupported target scheme in {raw:?}"
368        )));
369    }
370    Ok(RuntimeLogHostTarget::Unix(target.to_string()))
371}
372
373/// Returns the current runtime session id from `GESTALT_RUNTIME_SESSION_ID`.
374pub fn runtime_session_id() -> std::result::Result<String, RuntimeLogHostError> {
375    let session_id = std::env::var(ENV_RUNTIME_SESSION_ID)
376        .unwrap_or_default()
377        .trim()
378        .to_string();
379    if session_id.is_empty() {
380        return Err(RuntimeLogHostError::Env(format!(
381            "runtime session: {ENV_RUNTIME_SESSION_ID} is not set"
382        )));
383    }
384    Ok(session_id)
385}