1use std::sync::Arc;
8use std::collections::HashMap;
9use crate::Key;
10
11#[derive(Debug, Clone)]
16pub struct ObservationContext {
17 pub run_id: Option<String>,
19 pub workflow_name: Option<String>,
21 pub node_id: Option<String>,
23 pub metadata: HashMap<String, String>,
25}
26
27impl ObservationContext {
28 pub fn new() -> Self {
30 Self {
31 run_id: None,
32 workflow_name: None,
33 node_id: None,
34 metadata: HashMap::new(),
35 }
36 }
37
38 pub fn with_run_id(run_id: impl Into<String>) -> Self {
40 Self {
41 run_id: Some(run_id.into()),
42 workflow_name: None,
43 node_id: None,
44 metadata: HashMap::new(),
45 }
46 }
47
48 pub fn workflow(
50 run_id: impl Into<String>,
51 workflow_name: impl Into<String>,
52 node_id: Option<impl Into<String>>,
53 ) -> Self {
54 Self {
55 run_id: Some(run_id.into()),
56 workflow_name: Some(workflow_name.into()),
57 node_id: node_id.map(|n| n.into()),
58 metadata: HashMap::new(),
59 }
60 }
61
62 pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
64 self.metadata.insert(key.into(), value.into());
65 self
66 }
67
68 pub fn correlation_id(&self) -> String {
70 match (&self.run_id, &self.workflow_name, &self.node_id) {
71 (Some(run_id), Some(workflow), Some(node)) => {
72 format!("{}:{}:{}", run_id, workflow, node)
73 }
74 (Some(run_id), Some(workflow), None) => {
75 format!("{}:{}", run_id, workflow)
76 }
77 (Some(run_id), None, _) => run_id.clone(),
78 _ => "no-context".to_string(),
79 }
80 }
81}
82
83impl Default for ObservationContext {
84 fn default() -> Self {
85 Self::new()
86 }
87}
88
89pub trait DiObserver: Send + Sync {
166 fn resolving(&self, key: &Key);
175
176 fn resolved(&self, key: &Key, duration: std::time::Duration);
186
187 fn factory_panic(&self, key: &Key, message: &str);
198
199 fn resolving_with_context(&self, key: &Key, context: &ObservationContext) {
209 self.resolving(key);
211
212 if let Some(run_id) = &context.run_id {
214 if let Some(workflow_name) = &context.workflow_name {
215 println!("[{}] [{}] Starting resolution: {}",
216 run_id, workflow_name, key.display_name());
217 }
218 }
219 }
220
221 fn resolved_with_context(&self, key: &Key, duration: std::time::Duration, context: &ObservationContext) {
232 self.resolved(key, duration);
234
235 if let Some(run_id) = &context.run_id {
237 if let Some(workflow_name) = &context.workflow_name {
238 println!("[{}] [{}] Completed resolution: {} in {:?}",
239 run_id, workflow_name, key.display_name(), duration);
240 }
241 }
242 }
243
244 fn factory_panic_with_context(&self, key: &Key, message: &str, context: &ObservationContext) {
255 self.factory_panic(key, message);
257
258 if let Some(run_id) = &context.run_id {
260 if let Some(workflow_name) = &context.workflow_name {
261 eprintln!("[{}] [{}] FACTORY PANIC in {}: {}",
262 run_id, workflow_name, key.display_name(), message);
263 }
264 }
265 }
266}
267
268#[derive(Default)]
274pub(crate) struct Observers {
275 observers: Vec<Arc<dyn DiObserver>>,
276}
277
278impl Observers {
279 pub(crate) fn new() -> Self {
281 Self {
282 observers: Vec::new(),
283 }
284 }
285
286 pub(crate) fn add(&mut self, observer: Arc<dyn DiObserver>) {
288 self.observers.push(observer);
289 }
290
291 #[inline]
293 pub(crate) fn has_observers(&self) -> bool {
294 !self.observers.is_empty()
295 }
296
297
298 #[inline]
300 #[allow(dead_code)]
301 pub(crate) fn factory_panic(&self, key: &Key, message: &str) {
302 for observer in &self.observers {
303 observer.factory_panic(key, message);
304 }
305 }
306
307 #[inline]
309 pub(crate) fn resolving_with_context(&self, key: &Key, context: &ObservationContext) {
310 for observer in &self.observers {
311 observer.resolving_with_context(key, context);
312 }
313 }
314
315 #[inline]
317 pub(crate) fn resolved_with_context(&self, key: &Key, duration: std::time::Duration, context: &ObservationContext) {
318 for observer in &self.observers {
319 observer.resolved_with_context(key, duration, context);
320 }
321 }
322
323 #[inline]
325 #[allow(dead_code)]
326 pub(crate) fn factory_panic_with_context(&self, key: &Key, message: &str, context: &ObservationContext) {
327 for observer in &self.observers {
328 observer.factory_panic_with_context(key, message, context);
329 }
330 }
331}
332
333pub struct LoggingObserver {
352 prefix: String,
353}
354
355impl LoggingObserver {
356 pub fn new() -> Self {
358 Self {
359 prefix: "[ferrous-di]".to_string(),
360 }
361 }
362
363 pub fn with_prefix(prefix: impl Into<String>) -> Self {
365 Self {
366 prefix: prefix.into(),
367 }
368 }
369}
370
371impl Default for LoggingObserver {
372 fn default() -> Self {
373 Self::new()
374 }
375}
376
377impl DiObserver for LoggingObserver {
378 fn resolving(&self, key: &Key) {
379 println!("{} Resolving: {}", self.prefix, key.display_name());
380 }
381
382 fn resolved(&self, key: &Key, duration: std::time::Duration) {
383 println!("{} Resolved: {} in {:?}",
384 self.prefix, key.display_name(), duration);
385 }
386
387 fn factory_panic(&self, key: &Key, message: &str) {
388 eprintln!("{} FACTORY PANIC in {}: {}",
389 self.prefix, key.display_name(), message);
390 }
391
392 fn resolving_with_context(&self, key: &Key, context: &ObservationContext) {
393 println!("{} [{}] Resolving: {}",
394 self.prefix, context.correlation_id(), key.display_name());
395 }
396
397 fn resolved_with_context(&self, key: &Key, duration: std::time::Duration, context: &ObservationContext) {
398 println!("{} [{}] Resolved: {} in {:?}",
399 self.prefix, context.correlation_id(), key.display_name(), duration);
400 }
401
402 fn factory_panic_with_context(&self, key: &Key, message: &str, context: &ObservationContext) {
403 eprintln!("{} [{}] FACTORY PANIC in {}: {}",
404 self.prefix, context.correlation_id(), key.display_name(), message);
405 }
406}
407
408pub struct WorkflowObserver {
427 name: String,
428 track_performance: bool,
429}
430
431impl WorkflowObserver {
432 pub fn new() -> Self {
434 Self {
435 name: "workflow".to_string(),
436 track_performance: true,
437 }
438 }
439
440 pub fn with_name(name: impl Into<String>) -> Self {
442 Self {
443 name: name.into(),
444 track_performance: true,
445 }
446 }
447
448 pub fn with_performance_tracking(mut self, enabled: bool) -> Self {
450 self.track_performance = enabled;
451 self
452 }
453}
454
455impl Default for WorkflowObserver {
456 fn default() -> Self {
457 Self::new()
458 }
459}
460
461impl DiObserver for WorkflowObserver {
462 fn resolving(&self, key: &Key) {
463 if self.track_performance {
465 println!("[{}] Resolving: {}", self.name, key.display_name());
466 }
467 }
468
469 fn resolved(&self, key: &Key, duration: std::time::Duration) {
470 if self.track_performance {
471 println!("[{}] Resolved: {} in {:?}",
472 self.name, key.display_name(), duration);
473 }
474 }
475
476 fn factory_panic(&self, key: &Key, message: &str) {
477 eprintln!("[{}] PANIC in {}: {}",
478 self.name, key.display_name(), message);
479 }
480
481 fn resolving_with_context(&self, key: &Key, context: &ObservationContext) {
482 match (&context.run_id, &context.workflow_name, &context.node_id) {
484 (Some(run_id), Some(workflow), Some(node)) => {
485 println!("[{}] [run:{}] [workflow:{}] [node:{}] Resolving: {}",
486 self.name, run_id, workflow, node, key.display_name());
487 }
488 (Some(run_id), Some(workflow), None) => {
489 println!("[{}] [run:{}] [workflow:{}] Resolving: {}",
490 self.name, run_id, workflow, key.display_name());
491 }
492 (Some(run_id), None, _) => {
493 println!("[{}] [run:{}] Resolving: {}",
494 self.name, run_id, key.display_name());
495 }
496 _ => {
497 println!("[{}] [no-context] Resolving: {}",
498 self.name, key.display_name());
499 }
500 }
501 }
502
503 fn resolved_with_context(&self, key: &Key, duration: std::time::Duration, context: &ObservationContext) {
504 if self.track_performance {
505 match (&context.run_id, &context.workflow_name, &context.node_id) {
506 (Some(run_id), Some(workflow), Some(node)) => {
507 println!("[{}] [run:{}] [workflow:{}] [node:{}] Resolved: {} in {:?}",
508 self.name, run_id, workflow, node, key.display_name(), duration);
509 }
510 (Some(run_id), Some(workflow), None) => {
511 println!("[{}] [run:{}] [workflow:{}] Resolved: {} in {:?}",
512 self.name, run_id, workflow, key.display_name(), duration);
513 }
514 (Some(run_id), None, _) => {
515 println!("[{}] [run:{}] Resolved: {} in {:?}",
516 self.name, run_id, key.display_name(), duration);
517 }
518 _ => {
519 println!("[{}] [no-context] Resolved: {} in {:?}",
520 self.name, key.display_name(), duration);
521 }
522 }
523 }
524 }
525
526 fn factory_panic_with_context(&self, key: &Key, message: &str, context: &ObservationContext) {
527 eprintln!("[{}] [{}] PANIC in {}: {}",
528 self.name, context.correlation_id(), key.display_name(), message);
529 }
530}
531
532pub trait WorkflowContextProvider {
537 fn observation_context(&self) -> ObservationContext;
539}
540
541impl WorkflowContextProvider for crate::WorkflowContext {
543 fn observation_context(&self) -> ObservationContext {
544 ObservationContext::workflow(
545 self.run_id(),
546 self.workflow_name(),
547 None::<String>,
548 )
549 .with_metadata("started_at", format!("{:?}", self.started_at()))
550 .with_metadata("elapsed", format!("{:?}", self.elapsed()))
551 }
552}
553
554pub struct MetricsObserver {
559 pub resolution_count: std::sync::atomic::AtomicU64,
560 pub total_resolution_time: std::sync::atomic::AtomicU64,
561 pub panic_count: std::sync::atomic::AtomicU64,
562}
563
564impl MetricsObserver {
565 pub fn new() -> Self {
567 Self {
568 resolution_count: std::sync::atomic::AtomicU64::new(0),
569 total_resolution_time: std::sync::atomic::AtomicU64::new(0),
570 panic_count: std::sync::atomic::AtomicU64::new(0),
571 }
572 }
573
574 pub fn resolution_count(&self) -> u64 {
576 self.resolution_count.load(std::sync::atomic::Ordering::Relaxed)
577 }
578
579 pub fn average_resolution_time(&self) -> Option<std::time::Duration> {
581 let count = self.resolution_count();
582 if count == 0 {
583 return None;
584 }
585
586 let total_ns = self.total_resolution_time.load(std::sync::atomic::Ordering::Relaxed);
587 Some(std::time::Duration::from_nanos(total_ns / count))
588 }
589
590 pub fn total_resolution_time(&self) -> std::time::Duration {
592 let total_ns = self.total_resolution_time.load(std::sync::atomic::Ordering::Relaxed);
593 std::time::Duration::from_nanos(total_ns)
594 }
595
596 pub fn panic_count(&self) -> u64 {
598 self.panic_count.load(std::sync::atomic::Ordering::Relaxed)
599 }
600
601 pub fn reset(&self) {
603 self.resolution_count.store(0, std::sync::atomic::Ordering::Relaxed);
604 self.total_resolution_time.store(0, std::sync::atomic::Ordering::Relaxed);
605 self.panic_count.store(0, std::sync::atomic::Ordering::Relaxed);
606 }
607}
608
609impl Default for MetricsObserver {
610 fn default() -> Self {
611 Self::new()
612 }
613}
614
615impl DiObserver for MetricsObserver {
616 fn resolving(&self, _key: &Key) {
617 }
619
620 fn resolved(&self, _key: &Key, duration: std::time::Duration) {
621 self.resolution_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
622 self.total_resolution_time.fetch_add(duration.as_nanos() as u64, std::sync::atomic::Ordering::Relaxed);
623 }
624
625 fn factory_panic(&self, _key: &Key, _message: &str) {
626 self.panic_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
627 }
628
629 fn resolved_with_context(&self, key: &Key, duration: std::time::Duration, _context: &ObservationContext) {
630 self.resolved(key, duration);
632 }
633
634 fn factory_panic_with_context(&self, key: &Key, message: &str, _context: &ObservationContext) {
635 self.factory_panic(key, message);
637 }
638}
639
640#[cfg(test)]
641mod tests {
642 use super::*;
643 use std::time::Duration;
644 use std::sync::Arc;
645
646 #[test]
647 fn test_observation_context_creation() {
648 let context = ObservationContext::new();
649 assert!(context.run_id.is_none());
650 assert_eq!(context.correlation_id(), "no-context");
651
652 let context = ObservationContext::with_run_id("run-123");
653 assert_eq!(context.run_id.as_ref().unwrap(), "run-123");
654 assert_eq!(context.correlation_id(), "run-123");
655
656 let context = ObservationContext::workflow("run-456", "user_flow", Some("step_1"));
657 assert_eq!(context.correlation_id(), "run-456:user_flow:step_1");
658 }
659
660 #[test]
661 fn test_observation_context_metadata() {
662 let context = ObservationContext::with_run_id("run-123")
663 .with_metadata("user_id", "user-456")
664 .with_metadata("priority", "high");
665
666 assert_eq!(context.metadata.get("user_id").unwrap(), "user-456");
667 assert_eq!(context.metadata.get("priority").unwrap(), "high");
668 }
669
670 #[test]
671 fn test_workflow_observer() {
672 let observer = WorkflowObserver::new();
673 let key = crate::key_of_type::<String>();
674 let context = ObservationContext::workflow("run-123", "test_workflow", Some("node_1"));
675
676 observer.resolving(&key);
678 observer.resolved(&key, Duration::from_millis(1));
679 observer.resolving_with_context(&key, &context);
680 observer.resolved_with_context(&key, Duration::from_millis(1), &context);
681 }
682
683 #[test]
684 fn test_metrics_observer() {
685 let observer = MetricsObserver::new();
686 let key = crate::key_of_type::<String>();
687
688 assert_eq!(observer.resolution_count(), 0);
689 assert_eq!(observer.panic_count(), 0);
690 assert!(observer.average_resolution_time().is_none());
691
692 observer.resolved(&key, Duration::from_millis(10));
693 observer.resolved(&key, Duration::from_millis(20));
694
695 assert_eq!(observer.resolution_count(), 2);
696 assert!(observer.average_resolution_time().is_some());
697 assert!(observer.total_resolution_time() >= Duration::from_millis(30));
698
699 observer.factory_panic(&key, "test panic");
700 assert_eq!(observer.panic_count(), 1);
701
702 observer.reset();
703 assert_eq!(observer.resolution_count(), 0);
704 assert_eq!(observer.panic_count(), 0);
705 }
706
707 #[test]
708 fn test_workflow_context_provider() {
709 let workflow_ctx = crate::WorkflowContext::new("test_workflow");
710 let obs_ctx = workflow_ctx.observation_context();
711
712 assert_eq!(obs_ctx.run_id.as_ref().unwrap(), workflow_ctx.run_id());
713 assert_eq!(obs_ctx.workflow_name.as_ref().unwrap(), "test_workflow");
714 assert!(obs_ctx.metadata.contains_key("started_at"));
715 assert!(obs_ctx.metadata.contains_key("elapsed"));
716 }
717
718 #[test]
719 fn test_observers_with_context() {
720 let mut observers = crate::observer::Observers::new();
721 let observer = Arc::new(LoggingObserver::new());
722 observers.add(observer);
723
724 let key = crate::key_of_type::<String>();
725 let context = ObservationContext::workflow("run-123", "test_workflow", None::<String>);
726
727 observers.resolving_with_context(&key, &context);
729 observers.resolved_with_context(&key, Duration::from_millis(1), &context);
730 observers.factory_panic_with_context(&key, "test", &context);
731 }
732}