1use std::time::{SystemTime, UNIX_EPOCH};
2
3use hyper_util::rt::TokioIo;
4use tokio::net::UnixStream;
5use tonic::Request;
6use tonic::metadata::MetadataValue;
7use tonic::service::Interceptor;
8use tonic::service::interceptor::InterceptedService;
9use tonic::transport::{Channel, ClientTlsConfig, Endpoint, Uri};
10use tower::service_fn;
11
12use crate::generated::v1::{
13 self as pb,
14 plugin_runtime_log_host_client::PluginRuntimeLogHostClient as ProtoPluginRuntimeLogHostClient,
15};
16
17type RuntimeLogHostTransport = InterceptedService<Channel, RelayTokenInterceptor>;
18
19pub const ENV_RUNTIME_LOG_HOST_SOCKET: &str = "GESTALT_RUNTIME_LOG_SOCKET";
21pub const ENV_RUNTIME_LOG_HOST_SOCKET_TOKEN: &str = "GESTALT_RUNTIME_LOG_SOCKET_TOKEN";
23pub const ENV_RUNTIME_SESSION_ID: &str = "GESTALT_RUNTIME_SESSION_ID";
25const RUNTIME_LOG_RELAY_TOKEN_HEADER: &str = "x-gestalt-host-service-relay-token";
26
27pub type RuntimeLogStream = pb::PluginRuntimeLogStream;
29
30#[derive(Debug, thiserror::Error)]
31pub enum RuntimeLogHostError {
33 #[error("{0}")]
35 Transport(#[from] tonic::transport::Error),
36 #[error("{0}")]
38 Status(#[from] tonic::Status),
39 #[error("{0}")]
41 Env(String),
42}
43
44pub struct RuntimeLogHost {
46 client: ProtoPluginRuntimeLogHostClient<RuntimeLogHostTransport>,
47 source_seq: i64,
48}
49
50impl RuntimeLogHost {
51 pub async fn connect() -> std::result::Result<Self, RuntimeLogHostError> {
53 let socket_path = std::env::var(ENV_RUNTIME_LOG_HOST_SOCKET).map_err(|_| {
54 RuntimeLogHostError::Env(format!("{ENV_RUNTIME_LOG_HOST_SOCKET} is not set"))
55 })?;
56 let relay_token = std::env::var(ENV_RUNTIME_LOG_HOST_SOCKET_TOKEN).unwrap_or_default();
57 let channel = match parse_runtime_log_host_target(&socket_path)? {
58 RuntimeLogHostTarget::Unix(path) => {
59 Endpoint::try_from("http://[::]:50051")?
60 .connect_with_connector(service_fn(move |_: Uri| {
61 let path = path.clone();
62 async move { UnixStream::connect(path).await.map(TokioIo::new) }
63 }))
64 .await?
65 }
66 RuntimeLogHostTarget::Tcp(address) => {
67 Endpoint::from_shared(format!("http://{address}"))?
68 .connect()
69 .await?
70 }
71 RuntimeLogHostTarget::Tls(address) => {
72 Endpoint::from_shared(format!("https://{address}"))?
73 .tls_config(ClientTlsConfig::new().with_native_roots())?
74 .connect()
75 .await?
76 }
77 };
78
79 Ok(Self {
80 client: ProtoPluginRuntimeLogHostClient::with_interceptor(
81 channel,
82 relay_token_interceptor(relay_token.trim())?,
83 ),
84 source_seq: 0,
85 })
86 }
87
88 pub async fn append_logs(
90 &mut self,
91 request: pb::AppendPluginRuntimeLogsRequest,
92 ) -> std::result::Result<pb::AppendPluginRuntimeLogsResponse, RuntimeLogHostError> {
93 Ok(self.client.append_logs(request).await?.into_inner())
94 }
95
96 pub async fn append(
98 &mut self,
99 session_id: impl Into<String>,
100 stream: RuntimeLogStream,
101 message: impl Into<String>,
102 ) -> std::result::Result<pb::AppendPluginRuntimeLogsResponse, RuntimeLogHostError> {
103 self.source_seq += 1;
104 let source_seq = self.source_seq;
105 self.append_entry(session_id, stream, message, None, source_seq)
106 .await
107 }
108
109 pub async fn append_current(
111 &mut self,
112 stream: RuntimeLogStream,
113 message: impl Into<String>,
114 ) -> std::result::Result<pb::AppendPluginRuntimeLogsResponse, RuntimeLogHostError> {
115 self.append(runtime_session_id()?, stream, message).await
116 }
117
118 pub async fn append_entry(
120 &mut self,
121 session_id: impl Into<String>,
122 stream: RuntimeLogStream,
123 message: impl Into<String>,
124 observed_at: Option<prost_types::Timestamp>,
125 source_seq: i64,
126 ) -> std::result::Result<pb::AppendPluginRuntimeLogsResponse, RuntimeLogHostError> {
127 self.source_seq = self.source_seq.max(source_seq);
128 self.append_logs(pb::AppendPluginRuntimeLogsRequest {
129 session_id: session_id.into(),
130 logs: vec![pb::PluginRuntimeLogEntry {
131 stream: stream as i32,
132 message: message.into(),
133 observed_at: Some(observed_at.unwrap_or_else(timestamp_now)),
134 source_seq,
135 }],
136 })
137 .await
138 }
139
140 pub async fn append_current_entry(
142 &mut self,
143 stream: RuntimeLogStream,
144 message: impl Into<String>,
145 observed_at: Option<prost_types::Timestamp>,
146 source_seq: i64,
147 ) -> std::result::Result<pb::AppendPluginRuntimeLogsResponse, RuntimeLogHostError> {
148 self.append_entry(
149 runtime_session_id()?,
150 stream,
151 message,
152 observed_at,
153 source_seq,
154 )
155 .await
156 }
157
158 pub async fn append_stdout(
160 &mut self,
161 session_id: impl Into<String>,
162 message: impl Into<String>,
163 ) -> std::result::Result<pb::AppendPluginRuntimeLogsResponse, RuntimeLogHostError> {
164 self.append(session_id, RuntimeLogStream::Stdout, message)
165 .await
166 }
167
168 pub async fn append_current_stdout(
170 &mut self,
171 message: impl Into<String>,
172 ) -> std::result::Result<pb::AppendPluginRuntimeLogsResponse, RuntimeLogHostError> {
173 self.append_current(RuntimeLogStream::Stdout, message).await
174 }
175
176 pub async fn append_stderr(
178 &mut self,
179 session_id: impl Into<String>,
180 message: impl Into<String>,
181 ) -> std::result::Result<pb::AppendPluginRuntimeLogsResponse, RuntimeLogHostError> {
182 self.append(session_id, RuntimeLogStream::Stderr, message)
183 .await
184 }
185
186 pub async fn append_current_stderr(
188 &mut self,
189 message: impl Into<String>,
190 ) -> std::result::Result<pb::AppendPluginRuntimeLogsResponse, RuntimeLogHostError> {
191 self.append_current(RuntimeLogStream::Stderr, message).await
192 }
193
194 pub async fn append_runtime(
196 &mut self,
197 session_id: impl Into<String>,
198 message: impl Into<String>,
199 ) -> std::result::Result<pb::AppendPluginRuntimeLogsResponse, RuntimeLogHostError> {
200 self.append(session_id, RuntimeLogStream::Runtime, message)
201 .await
202 }
203
204 pub async fn append_current_runtime(
206 &mut self,
207 message: impl Into<String>,
208 ) -> std::result::Result<pb::AppendPluginRuntimeLogsResponse, RuntimeLogHostError> {
209 self.append_current(RuntimeLogStream::Runtime, message)
210 .await
211 }
212}
213
214#[derive(Clone)]
215struct RelayTokenInterceptor {
216 token: Option<MetadataValue<tonic::metadata::Ascii>>,
217}
218
219impl Interceptor for RelayTokenInterceptor {
220 fn call(
221 &mut self,
222 mut request: Request<()>,
223 ) -> std::result::Result<Request<()>, tonic::Status> {
224 if let Some(token) = self.token.clone() {
225 request
226 .metadata_mut()
227 .insert(RUNTIME_LOG_RELAY_TOKEN_HEADER, token);
228 }
229 Ok(request)
230 }
231}
232
233fn relay_token_interceptor(
234 token: &str,
235) -> std::result::Result<RelayTokenInterceptor, RuntimeLogHostError> {
236 let trimmed = token.trim();
237 let token = if trimmed.is_empty() {
238 None
239 } else {
240 Some(MetadataValue::try_from(trimmed).map_err(|err| {
241 RuntimeLogHostError::Env(format!(
242 "runtime log host: invalid relay token metadata: {err}"
243 ))
244 })?)
245 };
246 Ok(RelayTokenInterceptor { token })
247}
248
249enum RuntimeLogHostTarget {
250 Unix(String),
251 Tcp(String),
252 Tls(String),
253}
254
255fn parse_runtime_log_host_target(
256 raw: &str,
257) -> std::result::Result<RuntimeLogHostTarget, RuntimeLogHostError> {
258 let target = raw.trim();
259 if target.is_empty() {
260 return Err(RuntimeLogHostError::Env(
261 "runtime log host: transport target is required".to_string(),
262 ));
263 }
264 if let Some(address) = target.strip_prefix("tcp://") {
265 let address = address.trim();
266 if address.is_empty() {
267 return Err(RuntimeLogHostError::Env(format!(
268 "runtime log host: tcp target {raw:?} is missing host:port"
269 )));
270 }
271 return Ok(RuntimeLogHostTarget::Tcp(address.to_string()));
272 }
273 if let Some(address) = target.strip_prefix("tls://") {
274 let address = address.trim();
275 if address.is_empty() {
276 return Err(RuntimeLogHostError::Env(format!(
277 "runtime log host: tls target {raw:?} is missing host:port"
278 )));
279 }
280 return Ok(RuntimeLogHostTarget::Tls(address.to_string()));
281 }
282 if let Some(path) = target.strip_prefix("unix://") {
283 let path = path.trim();
284 if path.is_empty() {
285 return Err(RuntimeLogHostError::Env(format!(
286 "runtime log host: unix target {raw:?} is missing a socket path"
287 )));
288 }
289 return Ok(RuntimeLogHostTarget::Unix(path.to_string()));
290 }
291 if target.contains("://") {
292 return Err(RuntimeLogHostError::Env(format!(
293 "runtime log host: unsupported target scheme in {raw:?}"
294 )));
295 }
296 Ok(RuntimeLogHostTarget::Unix(target.to_string()))
297}
298
299pub fn runtime_session_id() -> std::result::Result<String, RuntimeLogHostError> {
301 let session_id = std::env::var(ENV_RUNTIME_SESSION_ID)
302 .unwrap_or_default()
303 .trim()
304 .to_string();
305 if session_id.is_empty() {
306 return Err(RuntimeLogHostError::Env(format!(
307 "runtime session: {ENV_RUNTIME_SESSION_ID} is not set"
308 )));
309 }
310 Ok(session_id)
311}
312
313fn timestamp_now() -> prost_types::Timestamp {
314 let now = SystemTime::now()
315 .duration_since(UNIX_EPOCH)
316 .expect("unix epoch");
317 prost_types::Timestamp {
318 seconds: now.as_secs() as i64,
319 nanos: now.subsec_nanos() as i32,
320 }
321}