1use dashmap::DashMap;
7use once_cell::sync::Lazy;
8use serde_json::Value;
9use std::collections::{HashSet, VecDeque};
10use std::sync::{Arc, Once, RwLock};
11use tokio::io::{AsyncBufReadExt, AsyncSeekExt, BufReader};
12use tokio::sync::broadcast;
13use tokio::task::JoinHandle;
14use tokio::time::{sleep, Duration, Instant};
15use tracing::{debug, warn};
16
17use super::client::TunnelClientHandle;
18use super::types::{TunnelChannel, TunnelEnvelope};
19use crate::metrics::MetricsRegistry;
20use crate::telemetry::{TelemetryClient, TelemetryEvent};
21
22const LOG_CHANNEL_BUFFER: usize = 2048;
23const LOG_CACHE_SIZE: usize = 1000;
24const DEFAULT_BACKFILL: usize = 200;
25
26#[derive(Clone, Debug)]
27pub struct LogStreamEntry {
28 pub id: String,
29 pub request_id: Option<String>,
30 pub timestamp_ms: u64,
31 pub timestamp: String,
32 pub source: String,
33 pub level: String,
34 pub message: String,
35 pub fields: Option<Value>,
36 pub method: Option<String>,
37 pub path: Option<String>,
38 pub status_code: Option<u16>,
39 pub latency_ms: Option<f64>,
40 pub client_ip: Option<String>,
41 pub rule_id: Option<String>,
42}
43
44impl LogStreamEntry {
45 fn new(
46 source: impl Into<String>,
47 level: impl Into<String>,
48 message: impl Into<String>,
49 ) -> Self {
50 let timestamp = chrono::Utc::now();
51 Self {
52 id: fastrand::u64(..).to_string(),
53 request_id: None,
54 timestamp_ms: timestamp.timestamp_millis().max(0) as u64,
55 timestamp: timestamp.to_rfc3339(),
56 source: source.into(),
57 level: level.into(),
58 message: message.into(),
59 fields: None,
60 method: None,
61 path: None,
62 status_code: None,
63 latency_ms: None,
64 client_ip: None,
65 rule_id: None,
66 }
67 }
68
69 fn to_entry_payload(&self) -> Value {
70 let mut entry = serde_json::json!({
71 "id": self.id,
72 "timestamp": self.timestamp,
73 "source": self.source,
74 "level": self.level,
75 "message": self.message,
76 "logTimestamp": self.timestamp_ms,
77 });
78
79 if let Some(request_id) = &self.request_id {
80 entry["requestId"] = Value::String(request_id.clone());
81 }
82 if let Some(fields) = &self.fields {
83 entry["fields"] = fields.clone();
84 }
85 if let Some(method) = &self.method {
86 entry["method"] = Value::String(method.clone());
87 }
88 if let Some(path) = &self.path {
89 entry["path"] = Value::String(path.clone());
90 }
91 if let Some(status_code) = self.status_code {
92 entry["statusCode"] = Value::Number(serde_json::Number::from(status_code as u64));
93 }
94 if let Some(latency_ms) = self.latency_ms {
95 entry["latencyMs"] = Value::Number(
96 serde_json::Number::from_f64(latency_ms)
97 .unwrap_or_else(|| serde_json::Number::from(0)),
98 );
99 }
100 if let Some(client_ip) = &self.client_ip {
101 entry["clientIp"] = Value::String(client_ip.clone());
102 }
103 if let Some(rule_id) = &self.rule_id {
104 entry["ruleId"] = Value::String(rule_id.clone());
105 }
106
107 entry
108 }
109
110 fn flat_fields(&self) -> serde_json::Map<String, Value> {
111 let mut fields = serde_json::Map::new();
112 fields.insert("source".to_string(), Value::String(self.source.clone()));
113 fields.insert("level".to_string(), Value::String(self.level.clone()));
114 fields.insert("message".to_string(), Value::String(self.message.clone()));
115 fields.insert(
116 "logTimestamp".to_string(),
117 Value::Number(self.timestamp_ms.into()),
118 );
119
120 if let Some(request_id) = &self.request_id {
121 fields.insert("requestId".to_string(), Value::String(request_id.clone()));
122 }
123 if let Some(extra) = &self.fields {
124 fields.insert("fields".to_string(), extra.clone());
125 }
126 if let Some(method) = &self.method {
127 fields.insert("method".to_string(), Value::String(method.clone()));
128 }
129 if let Some(path) = &self.path {
130 fields.insert("path".to_string(), Value::String(path.clone()));
131 }
132 if let Some(status_code) = self.status_code {
133 fields.insert(
134 "statusCode".to_string(),
135 Value::Number((status_code as u64).into()),
136 );
137 }
138 if let Some(latency_ms) = self.latency_ms {
139 fields.insert(
140 "latencyMs".to_string(),
141 Value::Number(
142 serde_json::Number::from_f64(latency_ms)
143 .unwrap_or_else(|| serde_json::Number::from(0)),
144 ),
145 );
146 }
147 if let Some(client_ip) = &self.client_ip {
148 fields.insert("clientIp".to_string(), Value::String(client_ip.clone()));
149 }
150 if let Some(rule_id) = &self.rule_id {
151 fields.insert("ruleId".to_string(), Value::String(rule_id.clone()));
152 }
153
154 fields
155 }
156}
157
158struct LogStreamState {
159 sender: broadcast::Sender<LogStreamEntry>,
160 buffer: RwLock<VecDeque<LogStreamEntry>>,
161}
162
163static LOG_STREAM: Lazy<LogStreamState> = Lazy::new(|| {
164 let (sender, _) = broadcast::channel(LOG_CHANNEL_BUFFER);
165 LogStreamState {
166 sender,
167 buffer: RwLock::new(VecDeque::with_capacity(LOG_CACHE_SIZE)),
168 }
169});
170static LOG_TAILERS_STARTED: Once = Once::new();
171
172fn push_to_buffer(entry: &LogStreamEntry) {
173 let mut buffer = LOG_STREAM
174 .buffer
175 .write()
176 .unwrap_or_else(|poisoned| poisoned.into_inner());
177 if buffer.len() >= LOG_CACHE_SIZE {
178 buffer.pop_front();
179 }
180 buffer.push_back(entry.clone());
181}
182
183pub fn publish_log(entry: LogStreamEntry) {
184 push_to_buffer(&entry);
185 let _ = LOG_STREAM.sender.send(entry);
186}
187
188pub fn publish_internal_log(level: &str, source: &str, message: String) {
189 publish_log(LogStreamEntry::new(source, level, message));
190}
191
192pub fn publish_access_log(
193 method: &str,
194 path: &str,
195 status_code: u16,
196 latency_ms: f64,
197 client_ip: Option<&str>,
198 request_id: Option<&str>,
199) {
200 let level = if status_code >= 500 {
201 "error"
202 } else if status_code >= 400 {
203 "warn"
204 } else {
205 "info"
206 };
207
208 let mut entry = LogStreamEntry::new(
209 "access",
210 level,
211 format!("{} {} status={}", method, path, status_code),
212 );
213 entry.method = Some(method.to_string());
214 entry.path = Some(path.to_string());
215 entry.status_code = Some(status_code);
216 entry.latency_ms = Some(latency_ms);
217 entry.client_ip = client_ip.map(|ip| ip.to_string());
218 entry.request_id = request_id.map(|v| v.to_string());
219
220 publish_log(entry);
221}
222
223pub fn publish_waf_log(
224 rule_ids: &[u32],
225 risk_score: u16,
226 client_ip: Option<&str>,
227 path: Option<&str>,
228 message: String,
229 request_id: Option<&str>,
230) {
231 let level = if risk_score >= 80 {
232 "error"
233 } else if risk_score >= 50 {
234 "warn"
235 } else {
236 "info"
237 };
238
239 let mut entry = LogStreamEntry::new("waf", level, message);
240 if let Some(first) = rule_ids.first() {
241 entry.rule_id = Some(first.to_string());
242 }
243 entry.client_ip = client_ip.map(|ip| ip.to_string());
244 entry.path = path.map(|p| p.to_string());
245 entry.request_id = request_id.map(|v| v.to_string());
246 entry.fields = Some(serde_json::json!({
247 "riskScore": risk_score,
248 "ruleIds": rule_ids,
249 }));
250
251 publish_log(entry);
252}
253
254fn subscribe_logs() -> broadcast::Receiver<LogStreamEntry> {
255 LOG_STREAM.sender.subscribe()
256}
257
258fn recent_logs(limit: usize) -> Vec<LogStreamEntry> {
259 let buffer = LOG_STREAM
260 .buffer
261 .read()
262 .unwrap_or_else(|poisoned| poisoned.into_inner());
263 let take = limit.min(buffer.len());
264 buffer.iter().rev().take(take).cloned().collect()
265}
266
267#[derive(Clone, Debug, Default)]
268struct LogStreamFilter {
269 sources: Option<HashSet<String>>,
270 levels: Option<HashSet<String>>,
271 search: Option<String>,
272 since_ms: Option<u64>,
273}
274
275impl LogStreamFilter {
276 fn matches(&self, entry: &LogStreamEntry) -> bool {
277 if let Some(sources) = &self.sources {
278 if !sources.contains(&entry.source) {
279 return false;
280 }
281 }
282
283 if let Some(levels) = &self.levels {
284 if !levels.contains(&entry.level) {
285 return false;
286 }
287 }
288
289 if let Some(since_ms) = self.since_ms {
290 if entry.timestamp_ms < since_ms {
291 return false;
292 }
293 }
294
295 if let Some(search) = &self.search {
296 let search_lower = search.to_lowercase();
297 let mut haystack = entry.message.to_lowercase();
298 if let Some(path) = &entry.path {
299 haystack.push_str(path);
300 }
301 if let Some(client_ip) = &entry.client_ip {
302 haystack.push_str(client_ip);
303 }
304 if let Some(rule_id) = &entry.rule_id {
305 haystack.push_str(rule_id);
306 }
307 if !haystack.contains(&search_lower) {
308 return false;
309 }
310 }
311
312 true
313 }
314}
315
316struct LogSession {
317 filter: Arc<RwLock<LogStreamFilter>>,
318 task: JoinHandle<()>,
319}
320
321pub struct TunnelLogService {
323 handle: TunnelClientHandle,
324 sessions: Arc<DashMap<String, LogSession>>,
325 metrics: Arc<MetricsRegistry>,
326}
327
328impl TunnelLogService {
329 pub fn new(handle: TunnelClientHandle, metrics: Arc<MetricsRegistry>) -> Self {
330 Self {
331 handle,
332 sessions: Arc::new(DashMap::new()),
333 metrics,
334 }
335 }
336
337 pub async fn run(self, mut shutdown_rx: broadcast::Receiver<()>) {
338 start_log_tailers();
339 let mut rx = self.handle.subscribe_channel(TunnelChannel::Logs);
340 loop {
341 tokio::select! {
342 message = rx.recv() => {
343 match message {
344 Ok(envelope) => {
345 let started = Instant::now();
346 self.handle_message(envelope).await;
347 self.metrics
348 .tunnel_metrics()
349 .record_handler_latency_ms(
350 TunnelChannel::Logs,
351 started.elapsed().as_millis() as u64,
352 );
353 }
354 Err(broadcast::error::RecvError::Lagged(count)) => {
355 warn!("Log service lagged by {} messages", count);
356 continue;
357 }
358 Err(broadcast::error::RecvError::Closed) => {
359 warn!("Log service channel closed");
360 break;
361 }
362 }
363 }
364 _ = shutdown_rx.recv() => {
365 debug!("Log service shutdown signal received");
366 break;
367 }
368 }
369 }
370
371 let session_ids: Vec<String> = self.sessions.iter().map(|e| e.key().clone()).collect();
373 for id in session_ids {
374 self.stop_session(&id);
375 }
376 }
377
378 async fn handle_message(&self, envelope: TunnelEnvelope) {
379 let Some(message_type) = envelope.raw.get("type").and_then(|v| v.as_str()) else {
380 return;
381 };
382
383 let session_id = envelope.session_id.clone().or_else(|| {
384 envelope
385 .raw
386 .get("sessionId")
387 .and_then(|v| v.as_str())
388 .map(|v| v.to_string())
389 });
390
391 match message_type {
392 "subscribe" => {
393 if let Some(session_id) = session_id {
394 self.start_session(&session_id, &envelope.raw).await;
395 } else {
396 warn!("Log subscribe missing sessionId");
397 }
398 }
399 "unsubscribe" => {
400 if let Some(session_id) = session_id {
401 self.stop_session(&session_id);
402 }
403 }
404 "filter" => {
405 if let Some(session_id) = session_id {
406 self.update_filter(&session_id, &envelope.raw);
407 }
408 }
409 "session-close" | "session-closed" => {
410 if let Some(session_id) = session_id {
411 self.stop_session(&session_id);
412 }
413 }
414 _ => {}
415 }
416 }
417
418 async fn start_session(&self, session_id: &str, payload: &Value) {
419 if self.sessions.contains_key(session_id) {
420 self.update_filter(session_id, payload);
421 return;
422 }
423
424 let filter = Arc::new(RwLock::new(parse_filter(payload)));
425 let backfill_enabled = payload
426 .get("backfill")
427 .and_then(|v| v.as_bool())
428 .unwrap_or(true);
429 let backfill_lines = payload
430 .get("backfillLines")
431 .and_then(|v| v.as_u64())
432 .map(|v| v as usize)
433 .unwrap_or(DEFAULT_BACKFILL);
434 let mut rx = subscribe_logs();
435 let handle = self.handle.clone();
436 let session_id = session_id.to_string();
437 let task_session_id = session_id.clone();
438 let filter_ref = Arc::clone(&filter);
439
440 let task = tokio::spawn(async move {
441 loop {
442 match rx.recv().await {
443 Ok(entry) => {
444 let passes = filter_ref
445 .read()
446 .unwrap_or_else(|poisoned| poisoned.into_inner())
447 .matches(&entry);
448 if !passes {
449 continue;
450 }
451
452 let mut message = serde_json::json!({
453 "type": "entry",
454 "channel": "logs",
455 "sessionId": task_session_id.as_str(),
456 "timestamp": chrono::Utc::now().to_rfc3339(),
457 });
458 if let Value::Object(ref mut map) = message {
459 map.extend(entry.flat_fields());
460 }
461 let _ = handle.try_send_json(message);
462 }
463 Err(broadcast::error::RecvError::Lagged(count)) => {
464 debug!("Log stream lagged by {} messages", count);
465 }
466 Err(broadcast::error::RecvError::Closed) => break,
467 }
468 }
469 });
470
471 self.sessions
472 .insert(session_id.clone(), LogSession { filter, task });
473
474 if backfill_enabled {
475 self.send_backfill(&session_id, backfill_lines).await;
476 }
477 }
478
479 fn stop_session(&self, session_id: &str) {
480 if let Some((_, session)) = self.sessions.remove(session_id) {
481 session.task.abort();
482 }
483 }
484
485 fn update_filter(&self, session_id: &str, payload: &Value) {
486 if let Some(session) = self.sessions.get(session_id) {
487 let mut filter = session
488 .filter
489 .write()
490 .unwrap_or_else(|poisoned| poisoned.into_inner());
491 *filter = parse_filter(payload);
492 }
493 }
494
495 async fn send_backfill(&self, session_id: &str, limit: usize) {
496 let entries = recent_logs(limit);
497 if entries.is_empty() {
498 return;
499 }
500
501 let filter = self
502 .sessions
503 .get(session_id)
504 .map(|session| session.filter.clone());
505 let Some(filter) = filter else {
506 return;
507 };
508
509 let filter_guard = filter
510 .read()
511 .unwrap_or_else(|poisoned| poisoned.into_inner());
512 let filtered: Vec<Value> = entries
513 .into_iter()
514 .filter(|entry| filter_guard.matches(entry))
515 .map(|entry| entry.to_entry_payload())
516 .collect();
517
518 if filtered.is_empty() {
519 return;
520 }
521
522 let batch = serde_json::json!({
523 "type": "log-batch",
524 "channel": "logs",
525 "sessionId": session_id,
526 "entries": filtered,
527 "timestamp": chrono::Utc::now().to_rfc3339(),
528 });
529
530 let _ = self.handle.try_send_json(batch);
531
532 let complete = serde_json::json!({
533 "type": "backfill-complete",
534 "channel": "logs",
535 "sessionId": session_id,
536 "count": filtered.len(),
537 "sources": serde_json::json!(["access", "waf", "system", "audit", "error"]),
538 "timestamp": chrono::Utc::now().to_rfc3339(),
539 });
540 let _ = self.handle.try_send_json(complete);
541 }
542}
543
544pub struct LogTelemetryService {
546 telemetry: Arc<TelemetryClient>,
547}
548
549impl LogTelemetryService {
550 pub fn new(telemetry: Arc<TelemetryClient>) -> Self {
551 Self { telemetry }
552 }
553
554 pub async fn run(self) {
555 start_log_tailers();
556 if !self.telemetry.is_enabled() {
557 return;
558 }
559
560 let mut rx = subscribe_logs();
561 loop {
562 match rx.recv().await {
563 Ok(entry) => {
564 let event = TelemetryEvent::LogEntry {
565 request_id: entry.request_id.clone(),
566 id: entry.id.clone(),
567 source: entry.source.clone(),
568 level: entry.level.clone(),
569 message: entry.message.clone(),
570 log_timestamp_ms: entry.timestamp_ms,
571 fields: entry.fields.clone(),
572 method: entry.method.clone(),
573 path: entry.path.clone(),
574 status_code: entry.status_code,
575 latency_ms: entry.latency_ms,
576 client_ip: entry.client_ip.clone(),
577 rule_id: entry.rule_id.clone(),
578 };
579
580 if let Err(err) = self.telemetry.record(event).await {
581 warn!("Failed to record log telemetry: {}", err);
582 }
583 }
584 Err(broadcast::error::RecvError::Lagged(count)) => {
585 debug!("Log telemetry lagged by {} entries", count);
586 }
587 Err(broadcast::error::RecvError::Closed) => break,
588 }
589 }
590 }
591}
592
593fn parse_filter(payload: &Value) -> LogStreamFilter {
594 let filter_value = payload.get("filter").unwrap_or(payload);
595
596 let sources = filter_value
597 .get("sources")
598 .and_then(|v| v.as_array())
599 .map(|arr| {
600 arr.iter()
601 .filter_map(|v| v.as_str().map(|s| s.to_string()))
602 .collect::<HashSet<_>>()
603 });
604
605 let levels = filter_value
606 .get("levels")
607 .and_then(|v| v.as_array())
608 .map(|arr| {
609 arr.iter()
610 .filter_map(|v| v.as_str().map(|s| s.to_string()))
611 .collect::<HashSet<_>>()
612 });
613
614 let search = filter_value
615 .get("search")
616 .and_then(|v| v.as_str())
617 .map(|s| s.to_string());
618
619 let since_ms = filter_value.get("since").and_then(|value| {
620 if let Some(s) = value.as_str() {
621 chrono::DateTime::parse_from_rfc3339(s)
622 .ok()
623 .map(|dt| dt.timestamp_millis().max(0) as u64)
624 } else {
625 value.as_u64().map(|ms| ms)
626 }
627 });
628
629 LogStreamFilter {
630 sources,
631 levels,
632 search,
633 since_ms,
634 }
635}
636
637fn start_log_tailers() {
638 LOG_TAILERS_STARTED.call_once(|| {
639 let kernel_paths = [
640 "/var/log/kern.log",
641 "/var/log/kernel.log",
642 "/var/log/messages",
643 ];
644 let syslog_paths = ["/var/log/syslog", "/var/log/messages"];
645
646 for path in kernel_paths {
647 if std::path::Path::new(path).exists() {
648 spawn_tailer(path.to_string(), "kernel");
649 }
650 }
651
652 for path in syslog_paths {
653 if std::path::Path::new(path).exists() {
654 spawn_tailer(path.to_string(), "syslog");
655 }
656 }
657 });
658}
659
660fn spawn_tailer(path: String, subsource: &'static str) {
661 tokio::spawn(async move {
662 if let Err(err) = tail_file(path.clone(), subsource).await {
663 warn!("Failed to tail {}: {}", path, err);
664 }
665 });
666}
667
668async fn tail_file(path: String, subsource: &'static str) -> Result<(), String> {
669 let file = tokio::fs::File::open(&path)
670 .await
671 .map_err(|err| format!("open error: {}", err))?;
672
673 let mut last_metadata = file.metadata().await.map_err(|e| e.to_string())?;
675
676 let mut reader = BufReader::new(file);
677 reader
678 .seek(std::io::SeekFrom::End(0))
679 .await
680 .map_err(|err| format!("seek error: {}", err))?;
681
682 let mut last_rotation_check = Instant::now();
683
684 loop {
685 let mut line = String::new();
686 let bytes = reader
687 .read_line(&mut line)
688 .await
689 .map_err(|err| format!("read error: {}", err))?;
690
691 if bytes == 0 {
692 if last_rotation_check.elapsed() > Duration::from_secs(5) {
694 if let Ok(current_metadata) = tokio::fs::metadata(&path).await {
695 let rotated = if let (Some(old_ino), Some(new_ino)) =
696 (get_inode(&last_metadata), get_inode(¤t_metadata))
697 {
698 old_ino != new_ino
699 } else {
700 current_metadata.len() < last_metadata.len()
702 };
703
704 if rotated {
705 debug!("Log file {} rotated, re-opening", path);
706 let new_file = tokio::fs::File::open(&path)
707 .await
708 .map_err(|err| format!("re-open error: {}", err))?;
709 last_metadata = new_file.metadata().await.map_err(|e| e.to_string())?;
710 reader = BufReader::new(new_file);
711 }
713 }
714 last_rotation_check = Instant::now();
715 }
716
717 sleep(Duration::from_millis(250)).await;
718 continue;
719 }
720
721 let trimmed = line.trim();
722 if trimmed.is_empty() {
723 continue;
724 }
725
726 let level = infer_level(trimmed);
727 let mut entry = LogStreamEntry::new("system", level, trimmed.to_string());
728 entry.fields = Some(serde_json::json!({
729 "subsource": subsource,
730 "path": path,
731 }));
732 publish_log(entry);
733 }
734}
735
736#[cfg(unix)]
737fn get_inode(metadata: &std::fs::Metadata) -> Option<u64> {
738 use std::os::unix::fs::MetadataExt;
739 Some(metadata.ino())
740}
741
742#[cfg(not(unix))]
743fn get_inode(_metadata: &std::fs::Metadata) -> Option<u64> {
744 None
745}
746
747fn infer_level(line: &str) -> &'static str {
748 let lower = line.to_lowercase();
749 if lower.contains("fatal") || lower.contains("panic") {
750 "fatal"
751 } else if lower.contains("error") || lower.contains("failed") {
752 "error"
753 } else if lower.contains("warn") {
754 "warn"
755 } else if lower.contains("debug") {
756 "debug"
757 } else if lower.contains("trace") {
758 "trace"
759 } else {
760 "info"
761 }
762}