use super::event::{AuditEvent, EventResult, EventType};
use super::exporter::AuditExporter;
use anyhow::{Context, Result};
use async_trait::async_trait;
use opentelemetry::{
KeyValue,
logs::{AnyValue, LogRecord as _, Logger, LoggerProvider as _, Severity},
};
use opentelemetry_otlp::{LogExporter, WithExportConfig};
use opentelemetry_sdk::{Resource, logs::SdkLoggerProvider};
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio::sync::RwLockReadGuard;
fn severity_to_str(severity: Severity) -> &'static str {
match severity {
Severity::Trace | Severity::Trace2 | Severity::Trace3 | Severity::Trace4 => "TRACE",
Severity::Debug | Severity::Debug2 | Severity::Debug3 | Severity::Debug4 => "DEBUG",
Severity::Info | Severity::Info2 | Severity::Info3 | Severity::Info4 => "INFO",
Severity::Warn | Severity::Warn2 | Severity::Warn3 | Severity::Warn4 => "WARN",
Severity::Error | Severity::Error2 | Severity::Error3 | Severity::Error4 => "ERROR",
Severity::Fatal | Severity::Fatal2 | Severity::Fatal3 | Severity::Fatal4 => "FATAL",
}
}
pub struct OtelExporter {
logger_provider: Arc<RwLock<SdkLoggerProvider>>,
endpoint: String,
}
impl OtelExporter {
pub fn new(endpoint: &str) -> Result<Self> {
url::Url::parse(endpoint).context("invalid endpoint URL")?;
if !endpoint.starts_with("https://") {
tracing::warn!(
endpoint = %endpoint,
"OpenTelemetry audit exporter is not using HTTPS. \
Audit data will be transmitted unencrypted. \
Use HTTPS for production deployments."
);
}
let exporter = LogExporter::builder()
.with_tonic()
.with_endpoint(endpoint)
.build()
.context("failed to build OTLP log exporter")?;
let resource = Resource::builder()
.with_service_name("bssh-server")
.with_attribute(KeyValue::new("service.version", env!("CARGO_PKG_VERSION")))
.build();
let logger_provider = SdkLoggerProvider::builder()
.with_resource(resource)
.with_simple_exporter(exporter)
.build();
Ok(Self {
logger_provider: Arc::new(RwLock::new(logger_provider)),
endpoint: endpoint.to_string(),
})
}
fn emit_event(&self, provider: &SdkLoggerProvider, event: &AuditEvent) {
let logger = provider.logger("bssh-audit");
let mut record = logger.create_log_record();
let severity = self.event_to_severity(&event.event_type, &event.result);
let body = format!(
"{:?} - {} - {:?}",
event.event_type, event.user, event.result
);
record.set_timestamp(event.timestamp.into());
record.set_observed_timestamp(std::time::SystemTime::now());
record.set_severity_number(severity);
record.set_severity_text(severity_to_str(severity));
record.set_body(AnyValue::String(body.into()));
record.add_attribute("event.id", AnyValue::String(event.id.clone().into()));
record.add_attribute(
"event.type",
AnyValue::String(format!("{:?}", event.event_type).into()),
);
record.add_attribute(
"session.id",
AnyValue::String(event.session_id.clone().into()),
);
record.add_attribute("user.name", AnyValue::String(event.user.clone().into()));
record.add_attribute(
"result",
AnyValue::String(format!("{:?}", event.result).into()),
);
if let Some(ref ip) = event.client_ip {
record.add_attribute("client.ip", AnyValue::String(ip.to_string().into()));
}
if let Some(ref path) = event.path {
record.add_attribute(
"file.path",
AnyValue::String(path.display().to_string().into()),
);
}
if let Some(ref dest_path) = event.dest_path {
record.add_attribute(
"file.dest_path",
AnyValue::String(dest_path.display().to_string().into()),
);
}
if let Some(bytes) = event.bytes {
record.add_attribute("file.bytes", AnyValue::Int(bytes as i64));
}
if let Some(ref protocol) = event.protocol {
record.add_attribute("protocol", AnyValue::String(protocol.clone().into()));
}
if let Some(ref details) = event.details {
record.add_attribute("details", AnyValue::String(details.clone().into()));
}
logger.emit(record);
}
fn event_to_severity(&self, event_type: &EventType, result: &EventResult) -> Severity {
if matches!(result, EventResult::Failure | EventResult::Denied) {
return match event_type {
EventType::AuthFailure | EventType::AuthRateLimited => Severity::Warn,
EventType::TransferDenied | EventType::CommandBlocked => Severity::Warn,
_ => Severity::Error,
};
}
match event_type {
EventType::SuspiciousActivity | EventType::IpBlocked => Severity::Error,
EventType::AuthFailure | EventType::AuthRateLimited => Severity::Warn,
EventType::TransferDenied | EventType::CommandBlocked => Severity::Warn,
EventType::IpUnblocked => Severity::Info,
EventType::AuthSuccess | EventType::SessionStart | EventType::SessionEnd => {
Severity::Info
}
EventType::FileOpenRead
| EventType::FileOpenWrite
| EventType::FileRead
| EventType::FileWrite
| EventType::FileClose
| EventType::FileUploaded
| EventType::FileDownloaded
| EventType::FileDeleted
| EventType::FileRenamed
| EventType::DirectoryCreated
| EventType::DirectoryDeleted
| EventType::DirectoryListed => Severity::Info,
EventType::CommandExecuted => Severity::Info,
EventType::TransferAllowed => Severity::Debug,
}
}
}
#[async_trait]
impl AuditExporter for OtelExporter {
async fn export(&self, event: AuditEvent) -> Result<()> {
let provider = self.logger_provider.read().await;
self.emit_event(&provider, &event);
Ok(())
}
async fn export_batch(&self, events: &[AuditEvent]) -> Result<()> {
let provider = self.logger_provider.read().await;
for event in events {
self.emit_event(&provider, event);
}
Ok(())
}
async fn flush(&self) -> Result<()> {
let provider: RwLockReadGuard<'_, SdkLoggerProvider> = self.logger_provider.read().await;
provider
.force_flush()
.context("failed to flush OTLP log exporter")?;
Ok(())
}
async fn close(&self) -> Result<()> {
let provider: tokio::sync::RwLockWriteGuard<'_, SdkLoggerProvider> =
self.logger_provider.write().await;
provider
.shutdown()
.context("failed to shutdown OTLP log exporter")?;
Ok(())
}
}
impl std::fmt::Debug for OtelExporter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OtelExporter")
.field("endpoint", &self.endpoint)
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_event_to_severity_security_events() {
let exporter = OtelExporter::new("http://localhost:4317").unwrap();
assert_eq!(
exporter.event_to_severity(&EventType::SuspiciousActivity, &EventResult::Success),
Severity::Error
);
assert_eq!(
exporter.event_to_severity(&EventType::IpBlocked, &EventResult::Success),
Severity::Error
);
assert_eq!(
exporter.event_to_severity(&EventType::AuthFailure, &EventResult::Failure),
Severity::Warn
);
assert_eq!(
exporter.event_to_severity(&EventType::AuthRateLimited, &EventResult::Denied),
Severity::Warn
);
}
#[tokio::test]
async fn test_event_to_severity_normal_operations() {
let exporter = OtelExporter::new("http://localhost:4317").unwrap();
assert_eq!(
exporter.event_to_severity(&EventType::AuthSuccess, &EventResult::Success),
Severity::Info
);
assert_eq!(
exporter.event_to_severity(&EventType::FileUploaded, &EventResult::Success),
Severity::Info
);
assert_eq!(
exporter.event_to_severity(&EventType::CommandExecuted, &EventResult::Success),
Severity::Info
);
}
#[tokio::test]
#[ignore = "Requires running OTLP collector; SimpleLogProcessor blocks on gRPC send"]
async fn test_export_single_event() {
let exporter = OtelExporter::new("http://localhost:4317").unwrap();
let event = AuditEvent::new(
EventType::SessionStart,
"charlie".to_string(),
"session-789".to_string(),
);
let result = exporter.export(event).await;
assert!(result.is_ok());
}
#[tokio::test]
#[ignore = "Requires running OTLP collector; SimpleLogProcessor blocks on gRPC send"]
async fn test_export_batch() {
let exporter = OtelExporter::new("http://localhost:4317").unwrap();
let events = vec![
AuditEvent::new(
EventType::AuthSuccess,
"user1".to_string(),
"session-1".to_string(),
),
AuditEvent::new(
EventType::FileUploaded,
"user2".to_string(),
"session-2".to_string(),
),
];
let result = exporter.export_batch(&events).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_flush() {
let exporter = OtelExporter::new("http://localhost:4317").unwrap();
let result = exporter.flush().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_close() {
let exporter = OtelExporter::new("http://localhost:4317").unwrap();
let result = exporter.close().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_debug_impl() {
let exporter = OtelExporter::new("http://localhost:4317").unwrap();
let debug_str = format!("{:?}", exporter);
assert!(debug_str.contains("OtelExporter"));
assert!(debug_str.contains("endpoint"));
}
}