1use std::time::SystemTime;
2
3use hyper_util::rt::TokioIo;
4use tokio::net::UnixStream;
5use tonic::Request;
6use tonic::metadata::MetadataValue;
7use tonic::service::Interceptor;
8use tonic::service::interceptor::InterceptedService;
9use tonic::transport::{Channel, ClientTlsConfig, Endpoint, Uri};
10use tower::service_fn;
11
12use crate::env::{ENV_HOST_SERVICE_SOCKET, ENV_HOST_SERVICE_TOKEN};
13use crate::generated::v1::{
14 self as pb, runtime_log_host_client::RuntimeLogHostClient as ProtoRuntimeLogHostClient,
15};
16
17type RuntimeLogHostTransport = InterceptedService<Channel, RelayTokenInterceptor>;
18
19pub const ENV_RUNTIME_SESSION_ID: &str = "GESTALT_RUNTIME_SESSION_ID";
21const RUNTIME_LOG_RELAY_TOKEN_HEADER: &str = "x-gestalt-host-service-relay-token";
22
23#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
24#[repr(i32)]
25pub enum RuntimeLogStream {
27 #[default]
28 Unspecified = 0,
30 Stdout = 1,
32 Stderr = 2,
34 Runtime = 3,
36}
37
38impl RuntimeLogStream {
39 const fn as_i32(self) -> i32 {
40 self as i32
41 }
42}
43
44#[derive(Clone, Debug, PartialEq, Eq)]
45pub struct RuntimeLogEntry {
47 pub stream: RuntimeLogStream,
49 pub message: String,
51 pub observed_at: SystemTime,
53 pub source_seq: i64,
55}
56
57#[derive(Clone, Debug, Default, PartialEq, Eq)]
58pub struct AppendRuntimeLogsRequest {
60 pub session_id: String,
62 pub logs: Vec<RuntimeLogEntry>,
64}
65
66#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
67pub struct AppendRuntimeLogsResponse {
69 pub last_seq: i64,
71}
72
73fn append_logs_request_to_proto(request: AppendRuntimeLogsRequest) -> pb::AppendRuntimeLogsRequest {
74 pb::AppendRuntimeLogsRequest {
75 session_id: request.session_id,
76 logs: request
77 .logs
78 .into_iter()
79 .map(|entry| pb::RuntimeLogEntry {
80 stream: entry.stream.as_i32(),
81 message: entry.message,
82 observed_at: Some(crate::protocol::timestamp_from_system_time(
83 entry.observed_at,
84 )),
85 source_seq: entry.source_seq,
86 })
87 .collect(),
88 }
89}
90
91fn append_logs_response_from_proto(
92 response: pb::AppendRuntimeLogsResponse,
93) -> AppendRuntimeLogsResponse {
94 AppendRuntimeLogsResponse {
95 last_seq: response.last_seq,
96 }
97}
98
99#[derive(Debug, thiserror::Error)]
100pub enum RuntimeLogHostError {
102 #[error("{0}")]
104 Transport(#[from] tonic::transport::Error),
105 #[error("{0}")]
107 Status(#[from] tonic::Status),
108 #[error("{0}")]
110 Env(String),
111}
112
113pub struct RuntimeLogHost {
115 client: ProtoRuntimeLogHostClient<RuntimeLogHostTransport>,
116 source_seq: i64,
117}
118
119impl RuntimeLogHost {
120 pub async fn connect() -> std::result::Result<Self, RuntimeLogHostError> {
122 let socket_path = std::env::var(ENV_HOST_SERVICE_SOCKET).map_err(|_| {
123 RuntimeLogHostError::Env(format!("{ENV_HOST_SERVICE_SOCKET} is not set"))
124 })?;
125 let relay_token = std::env::var(ENV_HOST_SERVICE_TOKEN).unwrap_or_default();
126 let channel = match parse_runtime_log_host_target(&socket_path)? {
127 RuntimeLogHostTarget::Unix(path) => {
128 Endpoint::try_from("http://[::]:50051")?
129 .connect_with_connector(service_fn(move |_: Uri| {
130 let path = path.clone();
131 async move { UnixStream::connect(path).await.map(TokioIo::new) }
132 }))
133 .await?
134 }
135 RuntimeLogHostTarget::Tcp(address) => {
136 Endpoint::from_shared(format!("http://{address}"))?
137 .connect()
138 .await?
139 }
140 RuntimeLogHostTarget::Tls(address) => {
141 Endpoint::from_shared(format!("https://{address}"))?
142 .tls_config(ClientTlsConfig::new().with_native_roots())?
143 .connect()
144 .await?
145 }
146 };
147
148 Ok(Self {
149 client: ProtoRuntimeLogHostClient::with_interceptor(
150 channel,
151 relay_token_interceptor(relay_token.trim())?,
152 ),
153 source_seq: 0,
154 })
155 }
156
157 pub async fn append_logs(
159 &mut self,
160 request: AppendRuntimeLogsRequest,
161 ) -> std::result::Result<AppendRuntimeLogsResponse, RuntimeLogHostError> {
162 Ok(append_logs_response_from_proto(
163 self.client
164 .append_logs(append_logs_request_to_proto(request))
165 .await?
166 .into_inner(),
167 ))
168 }
169
170 pub async fn append(
172 &mut self,
173 session_id: impl Into<String>,
174 stream: RuntimeLogStream,
175 message: impl Into<String>,
176 ) -> std::result::Result<AppendRuntimeLogsResponse, RuntimeLogHostError> {
177 self.source_seq += 1;
178 let source_seq = self.source_seq;
179 self.append_entry(session_id, stream, message, None, source_seq)
180 .await
181 }
182
183 pub async fn append_current(
185 &mut self,
186 stream: RuntimeLogStream,
187 message: impl Into<String>,
188 ) -> std::result::Result<AppendRuntimeLogsResponse, RuntimeLogHostError> {
189 self.append(runtime_session_id()?, stream, message).await
190 }
191
192 pub async fn append_entry(
194 &mut self,
195 session_id: impl Into<String>,
196 stream: RuntimeLogStream,
197 message: impl Into<String>,
198 observed_at: Option<SystemTime>,
199 source_seq: i64,
200 ) -> std::result::Result<AppendRuntimeLogsResponse, RuntimeLogHostError> {
201 self.source_seq = self.source_seq.max(source_seq);
202 self.append_logs(AppendRuntimeLogsRequest {
203 session_id: session_id.into(),
204 logs: vec![RuntimeLogEntry {
205 stream,
206 message: message.into(),
207 observed_at: observed_at.unwrap_or_else(SystemTime::now),
208 source_seq,
209 }],
210 })
211 .await
212 }
213
214 pub async fn append_current_entry(
216 &mut self,
217 stream: RuntimeLogStream,
218 message: impl Into<String>,
219 observed_at: Option<SystemTime>,
220 source_seq: i64,
221 ) -> std::result::Result<AppendRuntimeLogsResponse, RuntimeLogHostError> {
222 self.append_entry(
223 runtime_session_id()?,
224 stream,
225 message,
226 observed_at,
227 source_seq,
228 )
229 .await
230 }
231
232 pub async fn append_stdout(
234 &mut self,
235 session_id: impl Into<String>,
236 message: impl Into<String>,
237 ) -> std::result::Result<AppendRuntimeLogsResponse, RuntimeLogHostError> {
238 self.append(session_id, RuntimeLogStream::Stdout, message)
239 .await
240 }
241
242 pub async fn append_current_stdout(
244 &mut self,
245 message: impl Into<String>,
246 ) -> std::result::Result<AppendRuntimeLogsResponse, RuntimeLogHostError> {
247 self.append_current(RuntimeLogStream::Stdout, message).await
248 }
249
250 pub async fn append_stderr(
252 &mut self,
253 session_id: impl Into<String>,
254 message: impl Into<String>,
255 ) -> std::result::Result<AppendRuntimeLogsResponse, RuntimeLogHostError> {
256 self.append(session_id, RuntimeLogStream::Stderr, message)
257 .await
258 }
259
260 pub async fn append_current_stderr(
262 &mut self,
263 message: impl Into<String>,
264 ) -> std::result::Result<AppendRuntimeLogsResponse, RuntimeLogHostError> {
265 self.append_current(RuntimeLogStream::Stderr, message).await
266 }
267
268 pub async fn append_runtime(
270 &mut self,
271 session_id: impl Into<String>,
272 message: impl Into<String>,
273 ) -> std::result::Result<AppendRuntimeLogsResponse, RuntimeLogHostError> {
274 self.append(session_id, RuntimeLogStream::Runtime, message)
275 .await
276 }
277
278 pub async fn append_current_runtime(
280 &mut self,
281 message: impl Into<String>,
282 ) -> std::result::Result<AppendRuntimeLogsResponse, RuntimeLogHostError> {
283 self.append_current(RuntimeLogStream::Runtime, message)
284 .await
285 }
286}
287
288#[derive(Clone)]
289struct RelayTokenInterceptor {
290 token: Option<MetadataValue<tonic::metadata::Ascii>>,
291}
292
293impl Interceptor for RelayTokenInterceptor {
294 fn call(
295 &mut self,
296 mut request: Request<()>,
297 ) -> std::result::Result<Request<()>, tonic::Status> {
298 if let Some(token) = self.token.clone() {
299 request
300 .metadata_mut()
301 .insert(RUNTIME_LOG_RELAY_TOKEN_HEADER, token);
302 }
303 Ok(request)
304 }
305}
306
307fn relay_token_interceptor(
308 token: &str,
309) -> std::result::Result<RelayTokenInterceptor, RuntimeLogHostError> {
310 let trimmed = token.trim();
311 let token = if trimmed.is_empty() {
312 None
313 } else {
314 Some(MetadataValue::try_from(trimmed).map_err(|err| {
315 RuntimeLogHostError::Env(format!(
316 "runtime log host: invalid relay token metadata: {err}"
317 ))
318 })?)
319 };
320 Ok(RelayTokenInterceptor { token })
321}
322
323enum RuntimeLogHostTarget {
324 Unix(String),
325 Tcp(String),
326 Tls(String),
327}
328
329fn parse_runtime_log_host_target(
330 raw: &str,
331) -> std::result::Result<RuntimeLogHostTarget, RuntimeLogHostError> {
332 let target = raw.trim();
333 if target.is_empty() {
334 return Err(RuntimeLogHostError::Env(
335 "runtime log host: transport target is required".to_string(),
336 ));
337 }
338 if let Some(address) = target.strip_prefix("tcp://") {
339 let address = address.trim();
340 if address.is_empty() {
341 return Err(RuntimeLogHostError::Env(format!(
342 "runtime log host: tcp target {raw:?} is missing host:port"
343 )));
344 }
345 return Ok(RuntimeLogHostTarget::Tcp(address.to_string()));
346 }
347 if let Some(address) = target.strip_prefix("tls://") {
348 let address = address.trim();
349 if address.is_empty() {
350 return Err(RuntimeLogHostError::Env(format!(
351 "runtime log host: tls target {raw:?} is missing host:port"
352 )));
353 }
354 return Ok(RuntimeLogHostTarget::Tls(address.to_string()));
355 }
356 if let Some(path) = target.strip_prefix("unix://") {
357 let path = path.trim();
358 if path.is_empty() {
359 return Err(RuntimeLogHostError::Env(format!(
360 "runtime log host: unix target {raw:?} is missing a socket path"
361 )));
362 }
363 return Ok(RuntimeLogHostTarget::Unix(path.to_string()));
364 }
365 if target.contains("://") {
366 return Err(RuntimeLogHostError::Env(format!(
367 "runtime log host: unsupported target scheme in {raw:?}"
368 )));
369 }
370 Ok(RuntimeLogHostTarget::Unix(target.to_string()))
371}
372
373pub fn runtime_session_id() -> std::result::Result<String, RuntimeLogHostError> {
375 let session_id = std::env::var(ENV_RUNTIME_SESSION_ID)
376 .unwrap_or_default()
377 .trim()
378 .to_string();
379 if session_id.is_empty() {
380 return Err(RuntimeLogHostError::Env(format!(
381 "runtime session: {ENV_RUNTIME_SESSION_ID} is not set"
382 )));
383 }
384 Ok(session_id)
385}