Skip to main content

gestalt/
runtime_log_host.rs

1use std::time::{SystemTime, UNIX_EPOCH};
2
3use hyper_util::rt::TokioIo;
4use tokio::net::UnixStream;
5use tonic::Request;
6use tonic::metadata::MetadataValue;
7use tonic::service::Interceptor;
8use tonic::service::interceptor::InterceptedService;
9use tonic::transport::{Channel, ClientTlsConfig, Endpoint, Uri};
10use tower::service_fn;
11
12use crate::generated::v1::{
13    self as pb,
14    plugin_runtime_log_host_client::PluginRuntimeLogHostClient as ProtoPluginRuntimeLogHostClient,
15};
16
17type RuntimeLogHostTransport = InterceptedService<Channel, RelayTokenInterceptor>;
18
19/// Environment variable containing the runtime-log host-service target.
20pub const ENV_RUNTIME_LOG_HOST_SOCKET: &str = "GESTALT_RUNTIME_LOG_SOCKET";
21/// Environment variable containing the optional runtime-log relay token.
22pub const ENV_RUNTIME_LOG_HOST_SOCKET_TOKEN: &str = "GESTALT_RUNTIME_LOG_SOCKET_TOKEN";
23/// Environment variable containing the current plugin-runtime session id.
24pub const ENV_RUNTIME_SESSION_ID: &str = "GESTALT_RUNTIME_SESSION_ID";
25const RUNTIME_LOG_RELAY_TOKEN_HEADER: &str = "x-gestalt-host-service-relay-token";
26
27/// Runtime log stream enum generated from the provider protocol.
28pub type RuntimeLogStream = pb::PluginRuntimeLogStream;
29
30#[derive(Debug, thiserror::Error)]
31/// Errors returned by [`RuntimeLogHost`].
32pub enum RuntimeLogHostError {
33    /// The host-service transport could not be created.
34    #[error("{0}")]
35    Transport(#[from] tonic::transport::Error),
36    /// The host-service RPC returned a gRPC status.
37    #[error("{0}")]
38    Status(#[from] tonic::Status),
39    /// Required environment or target configuration was invalid.
40    #[error("{0}")]
41    Env(String),
42}
43
44/// Client for appending plugin-runtime logs to the host.
45pub struct RuntimeLogHost {
46    client: ProtoPluginRuntimeLogHostClient<RuntimeLogHostTransport>,
47    source_seq: i64,
48}
49
50impl RuntimeLogHost {
51    /// Connects to the runtime-log host service described by the environment.
52    pub async fn connect() -> std::result::Result<Self, RuntimeLogHostError> {
53        let socket_path = std::env::var(ENV_RUNTIME_LOG_HOST_SOCKET).map_err(|_| {
54            RuntimeLogHostError::Env(format!("{ENV_RUNTIME_LOG_HOST_SOCKET} is not set"))
55        })?;
56        let relay_token = std::env::var(ENV_RUNTIME_LOG_HOST_SOCKET_TOKEN).unwrap_or_default();
57        let channel = match parse_runtime_log_host_target(&socket_path)? {
58            RuntimeLogHostTarget::Unix(path) => {
59                Endpoint::try_from("http://[::]:50051")?
60                    .connect_with_connector(service_fn(move |_: Uri| {
61                        let path = path.clone();
62                        async move { UnixStream::connect(path).await.map(TokioIo::new) }
63                    }))
64                    .await?
65            }
66            RuntimeLogHostTarget::Tcp(address) => {
67                Endpoint::from_shared(format!("http://{address}"))?
68                    .connect()
69                    .await?
70            }
71            RuntimeLogHostTarget::Tls(address) => {
72                Endpoint::from_shared(format!("https://{address}"))?
73                    .tls_config(ClientTlsConfig::new().with_native_roots())?
74                    .connect()
75                    .await?
76            }
77        };
78
79        Ok(Self {
80            client: ProtoPluginRuntimeLogHostClient::with_interceptor(
81                channel,
82                relay_token_interceptor(relay_token.trim())?,
83            ),
84            source_seq: 0,
85        })
86    }
87
88    /// Appends logs using a raw protocol request message.
89    pub async fn append_logs(
90        &mut self,
91        request: pb::AppendPluginRuntimeLogsRequest,
92    ) -> std::result::Result<pb::AppendPluginRuntimeLogsResponse, RuntimeLogHostError> {
93        Ok(self.client.append_logs(request).await?.into_inner())
94    }
95
96    /// Appends one log entry for an explicit session id.
97    pub async fn append(
98        &mut self,
99        session_id: impl Into<String>,
100        stream: RuntimeLogStream,
101        message: impl Into<String>,
102    ) -> std::result::Result<pb::AppendPluginRuntimeLogsResponse, RuntimeLogHostError> {
103        self.source_seq += 1;
104        let source_seq = self.source_seq;
105        self.append_entry(session_id, stream, message, None, source_seq)
106            .await
107    }
108
109    /// Appends one log entry for `GESTALT_RUNTIME_SESSION_ID`.
110    pub async fn append_current(
111        &mut self,
112        stream: RuntimeLogStream,
113        message: impl Into<String>,
114    ) -> std::result::Result<pb::AppendPluginRuntimeLogsResponse, RuntimeLogHostError> {
115        self.append(runtime_session_id()?, stream, message).await
116    }
117
118    /// Appends one log entry with an explicit timestamp and source sequence.
119    pub async fn append_entry(
120        &mut self,
121        session_id: impl Into<String>,
122        stream: RuntimeLogStream,
123        message: impl Into<String>,
124        observed_at: Option<prost_types::Timestamp>,
125        source_seq: i64,
126    ) -> std::result::Result<pb::AppendPluginRuntimeLogsResponse, RuntimeLogHostError> {
127        self.source_seq = self.source_seq.max(source_seq);
128        self.append_logs(pb::AppendPluginRuntimeLogsRequest {
129            session_id: session_id.into(),
130            logs: vec![pb::PluginRuntimeLogEntry {
131                stream: stream as i32,
132                message: message.into(),
133                observed_at: Some(observed_at.unwrap_or_else(timestamp_now)),
134                source_seq,
135            }],
136        })
137        .await
138    }
139
140    /// Appends one explicit log entry for `GESTALT_RUNTIME_SESSION_ID`.
141    pub async fn append_current_entry(
142        &mut self,
143        stream: RuntimeLogStream,
144        message: impl Into<String>,
145        observed_at: Option<prost_types::Timestamp>,
146        source_seq: i64,
147    ) -> std::result::Result<pb::AppendPluginRuntimeLogsResponse, RuntimeLogHostError> {
148        self.append_entry(
149            runtime_session_id()?,
150            stream,
151            message,
152            observed_at,
153            source_seq,
154        )
155        .await
156    }
157
158    /// Appends one stdout log entry for an explicit session id.
159    pub async fn append_stdout(
160        &mut self,
161        session_id: impl Into<String>,
162        message: impl Into<String>,
163    ) -> std::result::Result<pb::AppendPluginRuntimeLogsResponse, RuntimeLogHostError> {
164        self.append(session_id, RuntimeLogStream::Stdout, message)
165            .await
166    }
167
168    /// Appends one stdout log entry for `GESTALT_RUNTIME_SESSION_ID`.
169    pub async fn append_current_stdout(
170        &mut self,
171        message: impl Into<String>,
172    ) -> std::result::Result<pb::AppendPluginRuntimeLogsResponse, RuntimeLogHostError> {
173        self.append_current(RuntimeLogStream::Stdout, message).await
174    }
175
176    /// Appends one stderr log entry for an explicit session id.
177    pub async fn append_stderr(
178        &mut self,
179        session_id: impl Into<String>,
180        message: impl Into<String>,
181    ) -> std::result::Result<pb::AppendPluginRuntimeLogsResponse, RuntimeLogHostError> {
182        self.append(session_id, RuntimeLogStream::Stderr, message)
183            .await
184    }
185
186    /// Appends one stderr log entry for `GESTALT_RUNTIME_SESSION_ID`.
187    pub async fn append_current_stderr(
188        &mut self,
189        message: impl Into<String>,
190    ) -> std::result::Result<pb::AppendPluginRuntimeLogsResponse, RuntimeLogHostError> {
191        self.append_current(RuntimeLogStream::Stderr, message).await
192    }
193
194    /// Appends one runtime log entry for an explicit session id.
195    pub async fn append_runtime(
196        &mut self,
197        session_id: impl Into<String>,
198        message: impl Into<String>,
199    ) -> std::result::Result<pb::AppendPluginRuntimeLogsResponse, RuntimeLogHostError> {
200        self.append(session_id, RuntimeLogStream::Runtime, message)
201            .await
202    }
203
204    /// Appends one runtime log entry for `GESTALT_RUNTIME_SESSION_ID`.
205    pub async fn append_current_runtime(
206        &mut self,
207        message: impl Into<String>,
208    ) -> std::result::Result<pb::AppendPluginRuntimeLogsResponse, RuntimeLogHostError> {
209        self.append_current(RuntimeLogStream::Runtime, message)
210            .await
211    }
212}
213
214#[derive(Clone)]
215struct RelayTokenInterceptor {
216    token: Option<MetadataValue<tonic::metadata::Ascii>>,
217}
218
219impl Interceptor for RelayTokenInterceptor {
220    fn call(
221        &mut self,
222        mut request: Request<()>,
223    ) -> std::result::Result<Request<()>, tonic::Status> {
224        if let Some(token) = self.token.clone() {
225            request
226                .metadata_mut()
227                .insert(RUNTIME_LOG_RELAY_TOKEN_HEADER, token);
228        }
229        Ok(request)
230    }
231}
232
233fn relay_token_interceptor(
234    token: &str,
235) -> std::result::Result<RelayTokenInterceptor, RuntimeLogHostError> {
236    let trimmed = token.trim();
237    let token = if trimmed.is_empty() {
238        None
239    } else {
240        Some(MetadataValue::try_from(trimmed).map_err(|err| {
241            RuntimeLogHostError::Env(format!(
242                "runtime log host: invalid relay token metadata: {err}"
243            ))
244        })?)
245    };
246    Ok(RelayTokenInterceptor { token })
247}
248
249enum RuntimeLogHostTarget {
250    Unix(String),
251    Tcp(String),
252    Tls(String),
253}
254
255fn parse_runtime_log_host_target(
256    raw: &str,
257) -> std::result::Result<RuntimeLogHostTarget, RuntimeLogHostError> {
258    let target = raw.trim();
259    if target.is_empty() {
260        return Err(RuntimeLogHostError::Env(
261            "runtime log host: transport target is required".to_string(),
262        ));
263    }
264    if let Some(address) = target.strip_prefix("tcp://") {
265        let address = address.trim();
266        if address.is_empty() {
267            return Err(RuntimeLogHostError::Env(format!(
268                "runtime log host: tcp target {raw:?} is missing host:port"
269            )));
270        }
271        return Ok(RuntimeLogHostTarget::Tcp(address.to_string()));
272    }
273    if let Some(address) = target.strip_prefix("tls://") {
274        let address = address.trim();
275        if address.is_empty() {
276            return Err(RuntimeLogHostError::Env(format!(
277                "runtime log host: tls target {raw:?} is missing host:port"
278            )));
279        }
280        return Ok(RuntimeLogHostTarget::Tls(address.to_string()));
281    }
282    if let Some(path) = target.strip_prefix("unix://") {
283        let path = path.trim();
284        if path.is_empty() {
285            return Err(RuntimeLogHostError::Env(format!(
286                "runtime log host: unix target {raw:?} is missing a socket path"
287            )));
288        }
289        return Ok(RuntimeLogHostTarget::Unix(path.to_string()));
290    }
291    if target.contains("://") {
292        return Err(RuntimeLogHostError::Env(format!(
293            "runtime log host: unsupported target scheme in {raw:?}"
294        )));
295    }
296    Ok(RuntimeLogHostTarget::Unix(target.to_string()))
297}
298
299/// Returns the current runtime session id from `GESTALT_RUNTIME_SESSION_ID`.
300pub fn runtime_session_id() -> std::result::Result<String, RuntimeLogHostError> {
301    let session_id = std::env::var(ENV_RUNTIME_SESSION_ID)
302        .unwrap_or_default()
303        .trim()
304        .to_string();
305    if session_id.is_empty() {
306        return Err(RuntimeLogHostError::Env(format!(
307            "runtime session: {ENV_RUNTIME_SESSION_ID} is not set"
308        )));
309    }
310    Ok(session_id)
311}
312
313fn timestamp_now() -> prost_types::Timestamp {
314    let now = SystemTime::now()
315        .duration_since(UNIX_EPOCH)
316        .expect("unix epoch");
317    prost_types::Timestamp {
318        seconds: now.as_secs() as i64,
319        nanos: now.subsec_nanos() as i32,
320    }
321}