1use chrono::Local;
7use gcp_auth::AuthenticationManager;
8
9use serde::Serialize;
10use std::time::Duration;
11use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
12use tracing::{Event, Subscriber};
13use tracing_subscriber::layer::Context;
14use tracing_subscriber::registry::LookupSpan;
15use tracing_subscriber::Layer;
16
17#[derive(Debug, Serialize)]
19pub struct ContainerMetadata {
20 pub id: String,
22 pub name: String,
24}
25
26#[derive(Debug, Serialize)]
28pub struct InstanceMetadata {
29 pub name: String,
31 pub id: String,
33 pub zone: String,
35 pub project_id: String,
37}
38
39#[derive(Debug, Serialize)]
41pub struct LogContextMetadata {
42 pub container: Option<ContainerMetadata>,
44 pub instance: InstanceMetadata,
46}
47
48#[derive(Debug, Serialize)]
50struct GcpLogEntry {
51 message: String,
53 severity: String,
55}
56
57pub struct GcpLoggingLayer {
59 channel: UnboundedSender<GcpLogEntry>,
61}
62
63impl GcpLoggingLayer {
64 pub async fn new(project_id: String) -> Result<Self, Box<dyn std::error::Error>> {
72 let token = match AuthenticationManager::new().await {
77 Ok(auth) => match auth
78 .get_token(&["https://www.googleapis.com/auth/logging.write"])
79 .await
80 {
81 Ok(tok) => tok.as_str().to_string(),
84 Err(e) => {
85 eprintln!(
86 "Warning: failed to acquire GCP token: {}. Proceeding without auth.",
87 e
88 );
89 String::new()
90 }
91 },
92 Err(e) => {
93 eprintln!("Warning: failed to initialize AuthenticationManager: {}. Proceeding without auth.", e);
94 String::new()
95 }
96 };
97
98 let metadata = match collect_log_metadata(project_id.clone()).await {
102 Ok(m) => m,
103 Err(e) => {
104 eprintln!(
105 "Warning: failed to collect metadata: {}. Using fallback values.",
106 e
107 );
108 LogContextMetadata {
109 container: None,
110 instance: InstanceMetadata {
111 name: "unknown".into(),
112 id: "0".into(),
113 zone: "".into(),
114 project_id,
115 },
116 }
117 }
118 };
119
120 let (channel, mut rx) = unbounded_channel::<GcpLogEntry>();
121 let client = reqwest::Client::new();
122
123 tokio::spawn(async move {
126 let mut warned_no_auth = false;
128 let skip_sending = token.is_empty();
129
130 loop {
131 let log_entry = rx.recv().await;
132 if log_entry.is_none() {
133 tokio::time::sleep(Duration::from_millis(1)).await;
134 continue;
135 }
136 let log_entry = log_entry.unwrap();
137
138 if skip_sending {
139 if !warned_no_auth {
140 eprintln!("Warning: no GCP auth token available; log entries will not be sent. Set up authentication to enable sending.");
141 warned_no_auth = true;
142 }
143 continue;
145 }
146
147 let entry = serde_json::json!({
148 "logName": format!("projects/{}/logs/proxie", metadata.instance.project_id),
149 "resource": {
150 "type": "gce_instance",
151 "labels": {
152 "instance_id": metadata.instance.id,
153 "zone": metadata.instance.zone,
154 "project_id": metadata.instance.project_id
155 }
156 },
157 "severity": log_entry.severity,
158 "jsonPayload": {
159 "message": log_entry.message,
160 "container": metadata.container,
161 "instance": metadata.instance
162 }
163 });
164 let body = serde_json::json!({ "entries": [entry] });
165
166 let mut req = client
168 .post("https://logging.googleapis.com/v2/entries:write")
169 .json(&body);
170 if !token.is_empty() {
171 req = req.bearer_auth(token.as_str());
172 }
173
174 let res = req.send().await;
175 if let Err(e) = res {
176 eprintln!("Failed to send log entry: {}", e);
177 }
178 }
179 });
180
181 Ok(Self { channel })
182 }
183
184 fn map_level_to_severity(level: &tracing::Level) -> &'static str {
192 match *level {
193 tracing::Level::ERROR => "ERROR",
194 tracing::Level::WARN => "WARNING",
195 tracing::Level::INFO => "INFO",
196 tracing::Level::DEBUG => "DEBUG",
197 tracing::Level::TRACE => "DEBUG",
198 }
199 }
200}
201
202impl<S> Layer<S> for GcpLoggingLayer
203where
204 S: Subscriber + for<'a> LookupSpan<'a>,
205{
206 fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
212 let mut message = "**UNDEFINED**".to_string();
213
214 event.record(
215 &mut |field: &tracing::field::Field, value: &dyn std::fmt::Debug| {
216 if field.name() == "message" {
217 message = format!("{:?}", value);
218 }
219 },
220 );
221 let now = Local::now().format("%Y-%m-%d %H:%M:%S,%3f").to_string();
222
223 let metadata = event.metadata();
224 let severity = Self::map_level_to_severity(metadata.level()).to_string();
225 let message = format!(
226 "[{}] {} [{} {}:{}] [{}]",
227 now,
228 severity,
229 metadata.target(),
230 metadata.file().unwrap_or("unknown_file"),
231 metadata.line().unwrap_or(0),
232 message
233 );
234
235 let log_entry = GcpLogEntry { severity, message };
236
237 let result = self.channel.send(log_entry);
238 if result.is_err() {
239 eprintln!("Error {:?}", result);
240 }
241 }
242}
243
244pub async fn collect_log_metadata(
252 project_id: String,
253) -> Result<LogContextMetadata, Box<dyn std::error::Error>> {
254 let client = reqwest::Client::new();
255
256 let container_id = get_container_id();
257 let container_name = std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown".into());
258
259 let container_metadata = container_id.clone().map(|id| ContainerMetadata {
260 id,
261 name: container_name,
262 });
263
264 let instance_name = get_metadata(&client, "instance/name")
265 .await
266 .unwrap_or_default();
267 let instance_id = get_metadata(&client, "instance/id")
268 .await
269 .unwrap_or_default();
270 let zone_path = get_metadata(&client, "instance/zone")
271 .await
272 .unwrap_or_default();
273 let zone = zone_path.split('/').last().unwrap_or("").to_string();
274
275 Ok(LogContextMetadata {
276 container: container_metadata,
277 instance: InstanceMetadata {
278 name: instance_name,
279 id: instance_id,
280 zone,
281 project_id,
282 },
283 })
284}
285
286fn get_container_id() -> Option<String> {
291 std::fs::read_to_string("/proc/self/cgroup")
292 .ok()?
293 .lines()
294 .find_map(|line| {
295 if let Some(pos) = line.rfind('/') {
296 Some(line[pos + 1..].to_string())
297 } else {
298 None
299 }
300 })
301}
302
303async fn get_metadata(client: &reqwest::Client, path: &str) -> Option<String> {
312 let url = format!(
313 "http://metadata.google.internal/computeMetadata/v1/{}",
314 path
315 );
316 client
317 .get(&url)
318 .header("Metadata-Flavor", "Google")
319 .send()
320 .await
321 .ok()?
322 .text()
323 .await
324 .ok()
325}