1use std::fmt;
49use std::fs::{File, OpenOptions};
50use std::io::{self, Write};
51use std::path::Path;
52use std::sync::{Arc, Mutex};
53use std::time::SystemTime;
54
55use async_trait::async_trait;
56use chrono::{DateTime, Utc};
57use serde_json::json;
58use tracing::{debug, error, info, trace, warn, Level};
59use warp::hyper::StatusCode;
60
61#[derive(Debug, Clone)]
66pub enum HttpEvent {
67 ServerStarted {
69 address: String,
71 },
72
73 ServerStopped,
75
76 RequestReceived {
78 method: String,
80 path: String,
82 client_ip: Option<String>,
84 timestamp: DateTime<Utc>,
86 },
87
88 ResponseSent {
90 status: StatusCode,
92 size: usize,
94 duration_ms: u64,
96 },
97
98 MessageReceived {
100 id: String,
102 type_: String,
104 from: Option<String>,
106 to: Option<String>,
108 },
109
110 MessageError {
112 error_type: String,
114 message: String,
116 message_id: Option<String>,
118 },
119}
120
121#[derive(Clone)]
123pub enum LogDestination {
124 Console,
126
127 File {
129 path: String,
131
132 max_size: Option<usize>,
134
135 rotate: bool,
137 },
138
139 Custom(Arc<dyn Fn(&str) + Send + Sync>),
141}
142
143impl fmt::Debug for LogDestination {
145 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
146 match self {
147 LogDestination::Console => write!(f, "LogDestination::Console"),
148 LogDestination::File {
149 path,
150 max_size,
151 rotate,
152 } => f
153 .debug_struct("LogDestination::File")
154 .field("path", path)
155 .field("max_size", max_size)
156 .field("rotate", rotate)
157 .finish(),
158 LogDestination::Custom(_) => write!(f, "LogDestination::Custom(<function>)"),
159 }
160 }
161}
162
163use serde::{Deserialize, Deserializer, Serialize, Serializer};
164
165#[derive(Debug, Clone)]
167pub struct EventLoggerConfig {
168 pub destination: LogDestination,
170
171 pub structured: bool,
173
174 pub log_level: Level,
176}
177
178impl Serialize for EventLoggerConfig {
180 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
181 where
182 S: Serializer,
183 {
184 use serde::ser::SerializeStruct;
185 let mut state = serializer.serialize_struct("EventLoggerConfig", 3)?;
186
187 match &self.destination {
189 LogDestination::Console => {
190 state.serialize_field("destination_type", "console")?;
191 state.serialize_field("destination_path", "")?;
192 }
193 LogDestination::File { path, .. } => {
194 state.serialize_field("destination_type", "file")?;
195 state.serialize_field("destination_path", path)?;
196 }
197 LogDestination::Custom(_) => {
198 state.serialize_field("destination_type", "custom")?;
199 state.serialize_field("destination_path", "")?;
200 }
201 }
202
203 state.serialize_field("structured", &self.structured)?;
204 state.serialize_field("log_level", &format!("{:?}", self.log_level))?;
205 state.end()
206 }
207}
208
209impl<'de> Deserialize<'de> for EventLoggerConfig {
210 fn deserialize<D>(_deserializer: D) -> Result<Self, D::Error>
211 where
212 D: Deserializer<'de>,
213 {
214 Ok(EventLoggerConfig::default())
217 }
218}
219
220impl EventLoggerConfig {
221 }
223
224impl Default for EventLoggerConfig {
225 fn default() -> Self {
226 Self {
227 destination: LogDestination::File {
228 path: "./logs/tap-http.log".to_string(),
229 max_size: Some(10 * 1024 * 1024), rotate: true,
231 },
232 structured: true,
233 log_level: Level::INFO,
234 }
235 }
236}
237
238pub trait EventSubscriber: Send + Sync {
243 fn handle_event(&self, event: HttpEvent) -> futures::future::BoxFuture<'_, ()>;
245}
246
247impl<T> EventSubscriber for T
249where
250 T: Send + Sync + 'static,
251 T: for<'a> HandleEvent<'a>,
252{
253 fn handle_event(&self, event: HttpEvent) -> futures::future::BoxFuture<'_, ()> {
254 Box::pin(self.handle_event_async(event))
255 }
256}
257
258#[async_trait]
260pub trait HandleEvent<'a>: Send + Sync {
261 async fn handle_event_async(&self, event: HttpEvent);
262}
263
264pub struct EventBus {
266 subscribers: Mutex<Vec<Arc<Box<dyn EventSubscriber>>>>,
268}
269
270impl EventBus {
271 pub fn new() -> Self {
273 Self {
274 subscribers: Mutex::new(Vec::new()),
275 }
276 }
277
278 pub fn subscribe<S>(&self, subscriber: S)
280 where
281 S: EventSubscriber + 'static,
282 {
283 let boxed = Arc::new(Box::new(subscriber) as Box<dyn EventSubscriber>);
284 let mut subscribers = self.subscribers.lock().unwrap();
285 subscribers.push(boxed);
286 }
287
288 pub fn unsubscribe(&self, subscriber: &Arc<Box<dyn EventSubscriber>>) {
290 let mut subscribers = self.subscribers.lock().unwrap();
291 subscribers.retain(|s| !Arc::ptr_eq(s, subscriber));
292 }
293
294 pub async fn publish_server_started(&self, address: String) {
296 let event = HttpEvent::ServerStarted { address };
297 self.publish_event(event).await;
298 }
299
300 pub async fn publish_server_stopped(&self) {
302 let event = HttpEvent::ServerStopped;
303 self.publish_event(event).await;
304 }
305
306 pub async fn publish_request_received(
308 &self,
309 method: String,
310 path: String,
311 client_ip: Option<String>,
312 ) {
313 let event = HttpEvent::RequestReceived {
314 method,
315 path,
316 client_ip,
317 timestamp: Utc::now(),
318 };
319 self.publish_event(event).await;
320 }
321
322 pub async fn publish_response_sent(&self, status: StatusCode, size: usize, duration_ms: u64) {
324 let event = HttpEvent::ResponseSent {
325 status,
326 size,
327 duration_ms,
328 };
329 self.publish_event(event).await;
330 }
331
332 pub async fn publish_message_received(
334 &self,
335 id: String,
336 type_: String,
337 from: Option<String>,
338 to: Option<String>,
339 ) {
340 let event = HttpEvent::MessageReceived {
341 id,
342 type_,
343 from,
344 to,
345 };
346 self.publish_event(event).await;
347 }
348
349 pub async fn publish_message_error(
351 &self,
352 error_type: String,
353 message: String,
354 message_id: Option<String>,
355 ) {
356 let event = HttpEvent::MessageError {
357 error_type,
358 message,
359 message_id,
360 };
361 self.publish_event(event).await;
362 }
363
364 async fn publish_event(&self, event: HttpEvent) {
366 let subscribers = self.subscribers.lock().unwrap().clone();
368 for subscriber in subscribers.iter() {
369 let fut = subscriber.handle_event(event.clone());
370 fut.await;
371 }
372 }
373
374 pub fn subscriber_count(&self) -> usize {
376 self.subscribers.lock().unwrap().len()
377 }
378}
379
380impl Default for EventBus {
381 fn default() -> Self {
382 Self::new()
383 }
384}
385
386pub struct EventLogger {
392 config: EventLoggerConfig,
394
395 file: Option<Arc<Mutex<File>>>,
397}
398
399impl EventLogger {
400 pub fn new(config: EventLoggerConfig) -> Self {
402 let file = match &config.destination {
403 LogDestination::File { path, .. } => match Self::open_log_file(path) {
404 Ok(file) => Some(Arc::new(Mutex::new(file))),
405 Err(err) => {
406 error!("Failed to open log file {}: {}", path, err);
407 None
408 }
409 },
410 _ => None,
411 };
412
413 Self { config, file }
414 }
415
416 fn open_log_file(path: &str) -> io::Result<File> {
418 if let Some(parent) = Path::new(path).parent() {
420 std::fs::create_dir_all(parent)?;
421 }
422
423 OpenOptions::new().create(true).append(true).open(path)
425 }
426
427 fn log_event(&self, event: &HttpEvent) -> crate::error::Result<()> {
429 let log_message = if self.config.structured {
430 self.format_structured_log(event)?
431 } else {
432 self.format_plain_log(event)
433 };
434
435 match &self.config.destination {
436 LogDestination::Console => {
437 match self.config.log_level {
439 Level::ERROR => error!("{}", log_message),
440 Level::WARN => warn!("{}", log_message),
441 Level::INFO => info!("{}", log_message),
442 Level::DEBUG => debug!("{}", log_message),
443 Level::TRACE => trace!("{}", log_message),
444 }
445 Ok(())
446 }
447 LogDestination::File { .. } => {
448 if let Some(file) = &self.file {
449 let mut file_guard = file.lock().map_err(|_| {
450 crate::error::Error::Config("Failed to acquire log file lock".to_string())
451 })?;
452
453 writeln!(file_guard, "{}", log_message).map_err(|err| {
455 crate::error::Error::Config(format!("Failed to write to log file: {}", err))
456 })?;
457
458 file_guard.flush().map_err(|err| {
460 crate::error::Error::Config(format!("Failed to flush log file: {}", err))
461 })?;
462
463 Ok(())
464 } else {
465 error!("{}", log_message);
467 Ok(())
468 }
469 }
470 LogDestination::Custom(func) => {
471 func(&log_message);
473 Ok(())
474 }
475 }
476 }
477
478 fn format_plain_log(&self, event: &HttpEvent) -> String {
480 let timestamp = DateTime::<Utc>::from(SystemTime::now()).format("%Y-%m-%dT%H:%M:%S%.3fZ");
481
482 match event {
483 HttpEvent::ServerStarted { address } => {
484 format!("[{}] SERVER STARTED: address={}", timestamp, address)
485 }
486 HttpEvent::ServerStopped => {
487 format!("[{}] SERVER STOPPED", timestamp)
488 }
489 HttpEvent::RequestReceived {
490 method,
491 path,
492 client_ip,
493 timestamp,
494 } => {
495 format!(
496 "[{}] REQUEST RECEIVED: method={}, path={}, client_ip={}, timestamp={}",
497 timestamp,
498 method,
499 path,
500 client_ip.as_deref().unwrap_or("unknown"),
501 timestamp.format("%Y-%m-%dT%H:%M:%S%.3fZ")
502 )
503 }
504 HttpEvent::ResponseSent {
505 status,
506 size,
507 duration_ms,
508 } => {
509 format!(
510 "[{}] RESPONSE SENT: status={}, size={}, duration_ms={}",
511 timestamp,
512 status.as_u16(),
513 size,
514 duration_ms
515 )
516 }
517 HttpEvent::MessageReceived {
518 id,
519 type_,
520 from,
521 to,
522 } => {
523 format!(
524 "[{}] MESSAGE RECEIVED: id={}, type={}, from={}, to={}",
525 timestamp,
526 id,
527 type_,
528 from.as_deref().unwrap_or("unknown"),
529 to.as_deref().unwrap_or("unknown")
530 )
531 }
532 HttpEvent::MessageError {
533 error_type,
534 message,
535 message_id,
536 } => {
537 format!(
538 "[{}] MESSAGE ERROR: type={}, message={}, message_id={}",
539 timestamp,
540 error_type,
541 message,
542 message_id.as_deref().unwrap_or("unknown")
543 )
544 }
545 }
546 }
547
548 fn format_structured_log(&self, event: &HttpEvent) -> crate::error::Result<String> {
550 let timestamp = DateTime::<Utc>::from(SystemTime::now()).to_rfc3339();
552
553 let (event_type, event_data) = match event {
555 HttpEvent::ServerStarted { address } => (
556 "server_started",
557 json!({
558 "address": address,
559 }),
560 ),
561 HttpEvent::ServerStopped => ("server_stopped", json!({})),
562 HttpEvent::RequestReceived {
563 method,
564 path,
565 client_ip,
566 timestamp,
567 } => (
568 "request_received",
569 json!({
570 "method": method,
571 "path": path,
572 "client_ip": client_ip,
573 "request_timestamp": timestamp.to_rfc3339(),
574 }),
575 ),
576 HttpEvent::ResponseSent {
577 status,
578 size,
579 duration_ms,
580 } => (
581 "response_sent",
582 json!({
583 "status": status.as_u16(),
584 "size": size,
585 "duration_ms": duration_ms,
586 }),
587 ),
588 HttpEvent::MessageReceived {
589 id,
590 type_,
591 from,
592 to,
593 } => (
594 "message_received",
595 json!({
596 "id": id,
597 "type": type_,
598 "from": from,
599 "to": to,
600 }),
601 ),
602 HttpEvent::MessageError {
603 error_type,
604 message,
605 message_id,
606 } => (
607 "message_error",
608 json!({
609 "error_type": error_type,
610 "message": message,
611 "message_id": message_id,
612 }),
613 ),
614 };
615
616 let log_entry = json!({
618 "timestamp": timestamp,
619 "event_type": event_type,
620 "data": event_data,
621 });
622
623 serde_json::to_string(&log_entry).map_err(|err| crate::error::Error::Json(err.to_string()))
625 }
626}
627
628#[async_trait]
629impl HandleEvent<'_> for EventLogger {
630 async fn handle_event_async(&self, event: HttpEvent) {
631 if let Err(err) = self.log_event(&event) {
632 error!("Failed to log event: {}", err);
633 }
634 }
635}
636
637impl fmt::Debug for EventLogger {
638 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
639 f.debug_struct("EventLogger")
640 .field("config", &self.config)
641 .field("file", &self.file.is_some())
642 .finish()
643 }
644}
645
646#[cfg(test)]
647mod tests {
648 use super::*;
649
650 #[tokio::test]
651 async fn test_event_bus_publish() {
652 struct TestSubscriber {
654 events: Arc<Mutex<Vec<HttpEvent>>>,
655 }
656
657 #[async_trait]
658 impl HandleEvent<'_> for TestSubscriber {
659 async fn handle_event_async(&self, event: HttpEvent) {
660 self.events.lock().unwrap().push(event);
661 }
662 }
663
664 let event_bus = EventBus::new();
666 let events = Arc::new(Mutex::new(Vec::new()));
667 let subscriber = TestSubscriber {
668 events: events.clone(),
669 };
670 event_bus.subscribe(subscriber);
671
672 event_bus
674 .publish_server_started("127.0.0.1:8000".to_string())
675 .await;
676 event_bus
677 .publish_request_received(
678 "GET".to_string(),
679 "/didcomm".to_string(),
680 Some("192.168.1.1".to_string()),
681 )
682 .await;
683
684 let received_events = events.lock().unwrap();
686 assert_eq!(received_events.len(), 2);
687
688 match &received_events[0] {
689 HttpEvent::ServerStarted { address } => {
690 assert_eq!(address, "127.0.0.1:8000");
691 }
692 _ => panic!("Expected ServerStarted event"),
693 }
694
695 match &received_events[1] {
696 HttpEvent::RequestReceived {
697 method,
698 path,
699 client_ip,
700 ..
701 } => {
702 assert_eq!(method, "GET");
703 assert_eq!(path, "/didcomm");
704 assert_eq!(client_ip, &Some("192.168.1.1".to_string()));
705 }
706 _ => panic!("Expected RequestReceived event"),
707 }
708 }
709}