Skip to main content

synapse_pingora/tunnel/
logs.rs

1//! Log streaming service for Signal Horizon tunnel.
2//!
3//! Subscribes to log stream requests from Signal Horizon and
4//! forwards filtered log entries over the tunnel connection.
5
6use dashmap::DashMap;
7use once_cell::sync::Lazy;
8use serde_json::Value;
9use std::collections::{HashSet, VecDeque};
10use std::sync::{Arc, Once, RwLock};
11use tokio::io::{AsyncBufReadExt, AsyncSeekExt, BufReader};
12use tokio::sync::broadcast;
13use tokio::task::JoinHandle;
14use tokio::time::{sleep, Duration, Instant};
15use tracing::{debug, warn};
16
17use super::client::TunnelClientHandle;
18use super::types::{TunnelChannel, TunnelEnvelope};
19use crate::metrics::MetricsRegistry;
20use crate::telemetry::{TelemetryClient, TelemetryEvent};
21
22const LOG_CHANNEL_BUFFER: usize = 2048;
23const LOG_CACHE_SIZE: usize = 1000;
24const DEFAULT_BACKFILL: usize = 200;
25
26#[derive(Clone, Debug)]
27pub struct LogStreamEntry {
28    pub id: String,
29    pub request_id: Option<String>,
30    pub timestamp_ms: u64,
31    pub timestamp: String,
32    pub source: String,
33    pub level: String,
34    pub message: String,
35    pub fields: Option<Value>,
36    pub method: Option<String>,
37    pub path: Option<String>,
38    pub status_code: Option<u16>,
39    pub latency_ms: Option<f64>,
40    pub client_ip: Option<String>,
41    pub rule_id: Option<String>,
42}
43
44impl LogStreamEntry {
45    fn new(
46        source: impl Into<String>,
47        level: impl Into<String>,
48        message: impl Into<String>,
49    ) -> Self {
50        let timestamp = chrono::Utc::now();
51        Self {
52            id: fastrand::u64(..).to_string(),
53            request_id: None,
54            timestamp_ms: timestamp.timestamp_millis().max(0) as u64,
55            timestamp: timestamp.to_rfc3339(),
56            source: source.into(),
57            level: level.into(),
58            message: message.into(),
59            fields: None,
60            method: None,
61            path: None,
62            status_code: None,
63            latency_ms: None,
64            client_ip: None,
65            rule_id: None,
66        }
67    }
68
69    fn to_entry_payload(&self) -> Value {
70        let mut entry = serde_json::json!({
71            "id": self.id,
72            "timestamp": self.timestamp,
73            "source": self.source,
74            "level": self.level,
75            "message": self.message,
76            "logTimestamp": self.timestamp_ms,
77        });
78
79        if let Some(request_id) = &self.request_id {
80            entry["requestId"] = Value::String(request_id.clone());
81        }
82        if let Some(fields) = &self.fields {
83            entry["fields"] = fields.clone();
84        }
85        if let Some(method) = &self.method {
86            entry["method"] = Value::String(method.clone());
87        }
88        if let Some(path) = &self.path {
89            entry["path"] = Value::String(path.clone());
90        }
91        if let Some(status_code) = self.status_code {
92            entry["statusCode"] = Value::Number(serde_json::Number::from(status_code as u64));
93        }
94        if let Some(latency_ms) = self.latency_ms {
95            entry["latencyMs"] = Value::Number(
96                serde_json::Number::from_f64(latency_ms)
97                    .unwrap_or_else(|| serde_json::Number::from(0)),
98            );
99        }
100        if let Some(client_ip) = &self.client_ip {
101            entry["clientIp"] = Value::String(client_ip.clone());
102        }
103        if let Some(rule_id) = &self.rule_id {
104            entry["ruleId"] = Value::String(rule_id.clone());
105        }
106
107        entry
108    }
109
110    fn flat_fields(&self) -> serde_json::Map<String, Value> {
111        let mut fields = serde_json::Map::new();
112        fields.insert("source".to_string(), Value::String(self.source.clone()));
113        fields.insert("level".to_string(), Value::String(self.level.clone()));
114        fields.insert("message".to_string(), Value::String(self.message.clone()));
115        fields.insert(
116            "logTimestamp".to_string(),
117            Value::Number(self.timestamp_ms.into()),
118        );
119
120        if let Some(request_id) = &self.request_id {
121            fields.insert("requestId".to_string(), Value::String(request_id.clone()));
122        }
123        if let Some(extra) = &self.fields {
124            fields.insert("fields".to_string(), extra.clone());
125        }
126        if let Some(method) = &self.method {
127            fields.insert("method".to_string(), Value::String(method.clone()));
128        }
129        if let Some(path) = &self.path {
130            fields.insert("path".to_string(), Value::String(path.clone()));
131        }
132        if let Some(status_code) = self.status_code {
133            fields.insert(
134                "statusCode".to_string(),
135                Value::Number((status_code as u64).into()),
136            );
137        }
138        if let Some(latency_ms) = self.latency_ms {
139            fields.insert(
140                "latencyMs".to_string(),
141                Value::Number(
142                    serde_json::Number::from_f64(latency_ms)
143                        .unwrap_or_else(|| serde_json::Number::from(0)),
144                ),
145            );
146        }
147        if let Some(client_ip) = &self.client_ip {
148            fields.insert("clientIp".to_string(), Value::String(client_ip.clone()));
149        }
150        if let Some(rule_id) = &self.rule_id {
151            fields.insert("ruleId".to_string(), Value::String(rule_id.clone()));
152        }
153
154        fields
155    }
156}
157
158struct LogStreamState {
159    sender: broadcast::Sender<LogStreamEntry>,
160    buffer: RwLock<VecDeque<LogStreamEntry>>,
161}
162
163static LOG_STREAM: Lazy<LogStreamState> = Lazy::new(|| {
164    let (sender, _) = broadcast::channel(LOG_CHANNEL_BUFFER);
165    LogStreamState {
166        sender,
167        buffer: RwLock::new(VecDeque::with_capacity(LOG_CACHE_SIZE)),
168    }
169});
170static LOG_TAILERS_STARTED: Once = Once::new();
171
172fn push_to_buffer(entry: &LogStreamEntry) {
173    let mut buffer = LOG_STREAM
174        .buffer
175        .write()
176        .unwrap_or_else(|poisoned| poisoned.into_inner());
177    if buffer.len() >= LOG_CACHE_SIZE {
178        buffer.pop_front();
179    }
180    buffer.push_back(entry.clone());
181}
182
183pub fn publish_log(entry: LogStreamEntry) {
184    push_to_buffer(&entry);
185    let _ = LOG_STREAM.sender.send(entry);
186}
187
188pub fn publish_internal_log(level: &str, source: &str, message: String) {
189    publish_log(LogStreamEntry::new(source, level, message));
190}
191
192pub fn publish_access_log(
193    method: &str,
194    path: &str,
195    status_code: u16,
196    latency_ms: f64,
197    client_ip: Option<&str>,
198    request_id: Option<&str>,
199) {
200    let level = if status_code >= 500 {
201        "error"
202    } else if status_code >= 400 {
203        "warn"
204    } else {
205        "info"
206    };
207
208    let mut entry = LogStreamEntry::new(
209        "access",
210        level,
211        format!("{} {} status={}", method, path, status_code),
212    );
213    entry.method = Some(method.to_string());
214    entry.path = Some(path.to_string());
215    entry.status_code = Some(status_code);
216    entry.latency_ms = Some(latency_ms);
217    entry.client_ip = client_ip.map(|ip| ip.to_string());
218    entry.request_id = request_id.map(|v| v.to_string());
219
220    publish_log(entry);
221}
222
223pub fn publish_waf_log(
224    rule_ids: &[u32],
225    risk_score: u16,
226    client_ip: Option<&str>,
227    path: Option<&str>,
228    message: String,
229    request_id: Option<&str>,
230) {
231    let level = if risk_score >= 80 {
232        "error"
233    } else if risk_score >= 50 {
234        "warn"
235    } else {
236        "info"
237    };
238
239    let mut entry = LogStreamEntry::new("waf", level, message);
240    if let Some(first) = rule_ids.first() {
241        entry.rule_id = Some(first.to_string());
242    }
243    entry.client_ip = client_ip.map(|ip| ip.to_string());
244    entry.path = path.map(|p| p.to_string());
245    entry.request_id = request_id.map(|v| v.to_string());
246    entry.fields = Some(serde_json::json!({
247        "riskScore": risk_score,
248        "ruleIds": rule_ids,
249    }));
250
251    publish_log(entry);
252}
253
254fn subscribe_logs() -> broadcast::Receiver<LogStreamEntry> {
255    LOG_STREAM.sender.subscribe()
256}
257
258fn recent_logs(limit: usize) -> Vec<LogStreamEntry> {
259    let buffer = LOG_STREAM
260        .buffer
261        .read()
262        .unwrap_or_else(|poisoned| poisoned.into_inner());
263    let take = limit.min(buffer.len());
264    buffer.iter().rev().take(take).cloned().collect()
265}
266
267#[derive(Clone, Debug, Default)]
268struct LogStreamFilter {
269    sources: Option<HashSet<String>>,
270    levels: Option<HashSet<String>>,
271    search: Option<String>,
272    since_ms: Option<u64>,
273}
274
275impl LogStreamFilter {
276    fn matches(&self, entry: &LogStreamEntry) -> bool {
277        if let Some(sources) = &self.sources {
278            if !sources.contains(&entry.source) {
279                return false;
280            }
281        }
282
283        if let Some(levels) = &self.levels {
284            if !levels.contains(&entry.level) {
285                return false;
286            }
287        }
288
289        if let Some(since_ms) = self.since_ms {
290            if entry.timestamp_ms < since_ms {
291                return false;
292            }
293        }
294
295        if let Some(search) = &self.search {
296            let search_lower = search.to_lowercase();
297            let mut haystack = entry.message.to_lowercase();
298            if let Some(path) = &entry.path {
299                haystack.push_str(path);
300            }
301            if let Some(client_ip) = &entry.client_ip {
302                haystack.push_str(client_ip);
303            }
304            if let Some(rule_id) = &entry.rule_id {
305                haystack.push_str(rule_id);
306            }
307            if !haystack.contains(&search_lower) {
308                return false;
309            }
310        }
311
312        true
313    }
314}
315
316struct LogSession {
317    filter: Arc<RwLock<LogStreamFilter>>,
318    task: JoinHandle<()>,
319}
320
321/// Tunnel service for log streaming.
322pub struct TunnelLogService {
323    handle: TunnelClientHandle,
324    sessions: Arc<DashMap<String, LogSession>>,
325    metrics: Arc<MetricsRegistry>,
326}
327
328impl TunnelLogService {
329    pub fn new(handle: TunnelClientHandle, metrics: Arc<MetricsRegistry>) -> Self {
330        Self {
331            handle,
332            sessions: Arc::new(DashMap::new()),
333            metrics,
334        }
335    }
336
337    pub async fn run(self, mut shutdown_rx: broadcast::Receiver<()>) {
338        start_log_tailers();
339        let mut rx = self.handle.subscribe_channel(TunnelChannel::Logs);
340        loop {
341            tokio::select! {
342                message = rx.recv() => {
343                    match message {
344                        Ok(envelope) => {
345                            let started = Instant::now();
346                            self.handle_message(envelope).await;
347                            self.metrics
348                                .tunnel_metrics()
349                                .record_handler_latency_ms(
350                                    TunnelChannel::Logs,
351                                    started.elapsed().as_millis() as u64,
352                                );
353                        }
354                        Err(broadcast::error::RecvError::Lagged(count)) => {
355                            warn!("Log service lagged by {} messages", count);
356                            continue;
357                        }
358                        Err(broadcast::error::RecvError::Closed) => {
359                            warn!("Log service channel closed");
360                            break;
361                        }
362                    }
363                }
364                _ = shutdown_rx.recv() => {
365                    debug!("Log service shutdown signal received");
366                    break;
367                }
368            }
369        }
370
371        // Stop all active log sessions
372        let session_ids: Vec<String> = self.sessions.iter().map(|e| e.key().clone()).collect();
373        for id in session_ids {
374            self.stop_session(&id);
375        }
376    }
377
378    async fn handle_message(&self, envelope: TunnelEnvelope) {
379        let Some(message_type) = envelope.raw.get("type").and_then(|v| v.as_str()) else {
380            return;
381        };
382
383        let session_id = envelope.session_id.clone().or_else(|| {
384            envelope
385                .raw
386                .get("sessionId")
387                .and_then(|v| v.as_str())
388                .map(|v| v.to_string())
389        });
390
391        match message_type {
392            "subscribe" => {
393                if let Some(session_id) = session_id {
394                    self.start_session(&session_id, &envelope.raw).await;
395                } else {
396                    warn!("Log subscribe missing sessionId");
397                }
398            }
399            "unsubscribe" => {
400                if let Some(session_id) = session_id {
401                    self.stop_session(&session_id);
402                }
403            }
404            "filter" => {
405                if let Some(session_id) = session_id {
406                    self.update_filter(&session_id, &envelope.raw);
407                }
408            }
409            "session-close" | "session-closed" => {
410                if let Some(session_id) = session_id {
411                    self.stop_session(&session_id);
412                }
413            }
414            _ => {}
415        }
416    }
417
418    async fn start_session(&self, session_id: &str, payload: &Value) {
419        if self.sessions.contains_key(session_id) {
420            self.update_filter(session_id, payload);
421            return;
422        }
423
424        let filter = Arc::new(RwLock::new(parse_filter(payload)));
425        let backfill_enabled = payload
426            .get("backfill")
427            .and_then(|v| v.as_bool())
428            .unwrap_or(true);
429        let backfill_lines = payload
430            .get("backfillLines")
431            .and_then(|v| v.as_u64())
432            .map(|v| v as usize)
433            .unwrap_or(DEFAULT_BACKFILL);
434        let mut rx = subscribe_logs();
435        let handle = self.handle.clone();
436        let session_id = session_id.to_string();
437        let task_session_id = session_id.clone();
438        let filter_ref = Arc::clone(&filter);
439
440        let task = tokio::spawn(async move {
441            loop {
442                match rx.recv().await {
443                    Ok(entry) => {
444                        let passes = filter_ref
445                            .read()
446                            .unwrap_or_else(|poisoned| poisoned.into_inner())
447                            .matches(&entry);
448                        if !passes {
449                            continue;
450                        }
451
452                        let mut message = serde_json::json!({
453                            "type": "entry",
454                            "channel": "logs",
455                            "sessionId": task_session_id.as_str(),
456                            "timestamp": chrono::Utc::now().to_rfc3339(),
457                        });
458                        if let Value::Object(ref mut map) = message {
459                            map.extend(entry.flat_fields());
460                        }
461                        let _ = handle.try_send_json(message);
462                    }
463                    Err(broadcast::error::RecvError::Lagged(count)) => {
464                        debug!("Log stream lagged by {} messages", count);
465                    }
466                    Err(broadcast::error::RecvError::Closed) => break,
467                }
468            }
469        });
470
471        self.sessions
472            .insert(session_id.clone(), LogSession { filter, task });
473
474        if backfill_enabled {
475            self.send_backfill(&session_id, backfill_lines).await;
476        }
477    }
478
479    fn stop_session(&self, session_id: &str) {
480        if let Some((_, session)) = self.sessions.remove(session_id) {
481            session.task.abort();
482        }
483    }
484
485    fn update_filter(&self, session_id: &str, payload: &Value) {
486        if let Some(session) = self.sessions.get(session_id) {
487            let mut filter = session
488                .filter
489                .write()
490                .unwrap_or_else(|poisoned| poisoned.into_inner());
491            *filter = parse_filter(payload);
492        }
493    }
494
495    async fn send_backfill(&self, session_id: &str, limit: usize) {
496        let entries = recent_logs(limit);
497        if entries.is_empty() {
498            return;
499        }
500
501        let filter = self
502            .sessions
503            .get(session_id)
504            .map(|session| session.filter.clone());
505        let Some(filter) = filter else {
506            return;
507        };
508
509        let filter_guard = filter
510            .read()
511            .unwrap_or_else(|poisoned| poisoned.into_inner());
512        let filtered: Vec<Value> = entries
513            .into_iter()
514            .filter(|entry| filter_guard.matches(entry))
515            .map(|entry| entry.to_entry_payload())
516            .collect();
517
518        if filtered.is_empty() {
519            return;
520        }
521
522        let batch = serde_json::json!({
523            "type": "log-batch",
524            "channel": "logs",
525            "sessionId": session_id,
526            "entries": filtered,
527            "timestamp": chrono::Utc::now().to_rfc3339(),
528        });
529
530        let _ = self.handle.try_send_json(batch);
531
532        let complete = serde_json::json!({
533            "type": "backfill-complete",
534            "channel": "logs",
535            "sessionId": session_id,
536            "count": filtered.len(),
537            "sources": serde_json::json!(["access", "waf", "system", "audit", "error"]),
538            "timestamp": chrono::Utc::now().to_rfc3339(),
539        });
540        let _ = self.handle.try_send_json(complete);
541    }
542}
543
544/// Telemetry-forwarding service for log entries.
545pub struct LogTelemetryService {
546    telemetry: Arc<TelemetryClient>,
547}
548
549impl LogTelemetryService {
550    pub fn new(telemetry: Arc<TelemetryClient>) -> Self {
551        Self { telemetry }
552    }
553
554    pub async fn run(self) {
555        start_log_tailers();
556        if !self.telemetry.is_enabled() {
557            return;
558        }
559
560        let mut rx = subscribe_logs();
561        loop {
562            match rx.recv().await {
563                Ok(entry) => {
564                    let event = TelemetryEvent::LogEntry {
565                        request_id: entry.request_id.clone(),
566                        id: entry.id.clone(),
567                        source: entry.source.clone(),
568                        level: entry.level.clone(),
569                        message: entry.message.clone(),
570                        log_timestamp_ms: entry.timestamp_ms,
571                        fields: entry.fields.clone(),
572                        method: entry.method.clone(),
573                        path: entry.path.clone(),
574                        status_code: entry.status_code,
575                        latency_ms: entry.latency_ms,
576                        client_ip: entry.client_ip.clone(),
577                        rule_id: entry.rule_id.clone(),
578                    };
579
580                    if let Err(err) = self.telemetry.record(event).await {
581                        warn!("Failed to record log telemetry: {}", err);
582                    }
583                }
584                Err(broadcast::error::RecvError::Lagged(count)) => {
585                    debug!("Log telemetry lagged by {} entries", count);
586                }
587                Err(broadcast::error::RecvError::Closed) => break,
588            }
589        }
590    }
591}
592
593fn parse_filter(payload: &Value) -> LogStreamFilter {
594    let filter_value = payload.get("filter").unwrap_or(payload);
595
596    let sources = filter_value
597        .get("sources")
598        .and_then(|v| v.as_array())
599        .map(|arr| {
600            arr.iter()
601                .filter_map(|v| v.as_str().map(|s| s.to_string()))
602                .collect::<HashSet<_>>()
603        });
604
605    let levels = filter_value
606        .get("levels")
607        .and_then(|v| v.as_array())
608        .map(|arr| {
609            arr.iter()
610                .filter_map(|v| v.as_str().map(|s| s.to_string()))
611                .collect::<HashSet<_>>()
612        });
613
614    let search = filter_value
615        .get("search")
616        .and_then(|v| v.as_str())
617        .map(|s| s.to_string());
618
619    let since_ms = filter_value.get("since").and_then(|value| {
620        if let Some(s) = value.as_str() {
621            chrono::DateTime::parse_from_rfc3339(s)
622                .ok()
623                .map(|dt| dt.timestamp_millis().max(0) as u64)
624        } else {
625            value.as_u64().map(|ms| ms)
626        }
627    });
628
629    LogStreamFilter {
630        sources,
631        levels,
632        search,
633        since_ms,
634    }
635}
636
637fn start_log_tailers() {
638    LOG_TAILERS_STARTED.call_once(|| {
639        let kernel_paths = [
640            "/var/log/kern.log",
641            "/var/log/kernel.log",
642            "/var/log/messages",
643        ];
644        let syslog_paths = ["/var/log/syslog", "/var/log/messages"];
645
646        for path in kernel_paths {
647            if std::path::Path::new(path).exists() {
648                spawn_tailer(path.to_string(), "kernel");
649            }
650        }
651
652        for path in syslog_paths {
653            if std::path::Path::new(path).exists() {
654                spawn_tailer(path.to_string(), "syslog");
655            }
656        }
657    });
658}
659
660fn spawn_tailer(path: String, subsource: &'static str) {
661    tokio::spawn(async move {
662        if let Err(err) = tail_file(path.clone(), subsource).await {
663            warn!("Failed to tail {}: {}", path, err);
664        }
665    });
666}
667
668async fn tail_file(path: String, subsource: &'static str) -> Result<(), String> {
669    let file = tokio::fs::File::open(&path)
670        .await
671        .map_err(|err| format!("open error: {}", err))?;
672
673    // Get initial metadata to track rotation
674    let mut last_metadata = file.metadata().await.map_err(|e| e.to_string())?;
675
676    let mut reader = BufReader::new(file);
677    reader
678        .seek(std::io::SeekFrom::End(0))
679        .await
680        .map_err(|err| format!("seek error: {}", err))?;
681
682    let mut last_rotation_check = Instant::now();
683
684    loop {
685        let mut line = String::new();
686        let bytes = reader
687            .read_line(&mut line)
688            .await
689            .map_err(|err| format!("read error: {}", err))?;
690
691        if bytes == 0 {
692            // Periodically check for rotation (every 5 seconds or when idle)
693            if last_rotation_check.elapsed() > Duration::from_secs(5) {
694                if let Ok(current_metadata) = tokio::fs::metadata(&path).await {
695                    let rotated = if let (Some(old_ino), Some(new_ino)) =
696                        (get_inode(&last_metadata), get_inode(&current_metadata))
697                    {
698                        old_ino != new_ino
699                    } else {
700                        // Fallback to size/mtime if inodes not available or comparable
701                        current_metadata.len() < last_metadata.len()
702                    };
703
704                    if rotated {
705                        debug!("Log file {} rotated, re-opening", path);
706                        let new_file = tokio::fs::File::open(&path)
707                            .await
708                            .map_err(|err| format!("re-open error: {}", err))?;
709                        last_metadata = new_file.metadata().await.map_err(|e| e.to_string())?;
710                        reader = BufReader::new(new_file);
711                        // Start reading from the beginning of the new file
712                    }
713                }
714                last_rotation_check = Instant::now();
715            }
716
717            sleep(Duration::from_millis(250)).await;
718            continue;
719        }
720
721        let trimmed = line.trim();
722        if trimmed.is_empty() {
723            continue;
724        }
725
726        let level = infer_level(trimmed);
727        let mut entry = LogStreamEntry::new("system", level, trimmed.to_string());
728        entry.fields = Some(serde_json::json!({
729            "subsource": subsource,
730            "path": path,
731        }));
732        publish_log(entry);
733    }
734}
735
736#[cfg(unix)]
737fn get_inode(metadata: &std::fs::Metadata) -> Option<u64> {
738    use std::os::unix::fs::MetadataExt;
739    Some(metadata.ino())
740}
741
742#[cfg(not(unix))]
743fn get_inode(_metadata: &std::fs::Metadata) -> Option<u64> {
744    None
745}
746
747fn infer_level(line: &str) -> &'static str {
748    let lower = line.to_lowercase();
749    if lower.contains("fatal") || lower.contains("panic") {
750        "fatal"
751    } else if lower.contains("error") || lower.contains("failed") {
752        "error"
753    } else if lower.contains("warn") {
754        "warn"
755    } else if lower.contains("debug") {
756        "debug"
757    } else if lower.contains("trace") {
758        "trace"
759    } else {
760        "info"
761    }
762}