zoey_core/observability/
mod.rs

1pub mod config;
2pub mod cost_tracker;
3pub mod rest;
4pub mod security_monitor;
5pub mod types;
6
7pub use config::*;
8pub use cost_tracker::{get_global_cost_tracker, set_global_cost_tracker, CostTracker};
9pub use rest::start_rest_api;
10pub use security_monitor::{AlertChannel, AlertSeverity, SecurityAlert, SecurityMonitor};
11pub use types::*;
12
13use crate::error::ZoeyError;
14use crate::types::IDatabaseAdapter;
15use crate::AgentRuntime;
16use crate::Plugin;
17use std::any::Any;
18use std::collections::HashMap;
19use std::sync::{Arc, RwLock};
20
21/// Main observability service
22pub struct Observability {
23    pub config: ObservabilityConfig,
24    pub cost_tracker: Option<Arc<CostTracker>>,
25    pub security_monitor: Option<Arc<SecurityMonitor>>,
26    pub rate_limits: Arc<RwLock<HashMap<String, ProviderRateLimit>>>,
27}
28
29impl Observability {
30    pub fn new(
31        config: ObservabilityConfig,
32        db: Option<Arc<dyn IDatabaseAdapter + Send + Sync>>,
33    ) -> Self {
34        // Always initialize a cost tracker; persistence depends on storage availability.
35        let cost_tracker = Some(Arc::new(CostTracker::new(db.clone())));
36
37        let security_monitor = if config.enabled {
38            Some(Arc::new(SecurityMonitor::new(config.clone())))
39        } else {
40            None
41        };
42
43        Self {
44            config,
45            cost_tracker,
46            security_monitor,
47            rate_limits: Arc::new(RwLock::new(HashMap::new())),
48        }
49    }
50
51    /// Initialize observability (called on startup)
52    pub async fn initialize(&self) -> Result<(), ZoeyError> {
53        // Start REST API server if enabled
54        if self.config.rest_api.enabled {
55            let cost_tracker = self.cost_tracker.clone();
56            let rest_config = self.config.rest_api.clone();
57
58            tokio::spawn(async move {
59                if let Err(e) = start_rest_api(rest_config, cost_tracker).await {
60                    tracing::error!("Failed to start REST API: {}", e);
61                }
62            });
63
64            tracing::info!(
65                "Observability REST API started on {}:{}",
66                self.config.rest_api.host,
67                self.config.rest_api.port
68            );
69        }
70
71        Ok(())
72    }
73}
74
75use sha2::{Digest, Sha256};
76
77pub fn compute_prompt_preview(s: &str) -> String {
78    s.chars().take(200).collect()
79}
80
81pub fn compute_prompt_hash(s: &str) -> String {
82    let mut hasher = Sha256::new();
83    hasher.update(s.as_bytes());
84    let hash = hasher.finalize();
85    hex::encode(hash)
86}
87
88/// Observability plugin wrapper to integrate with the plugin system
89pub struct ObservabilityPlugin;
90
91impl ObservabilityPlugin {
92    pub fn new() -> Self {
93        Self
94    }
95}
96
97#[async_trait::async_trait]
98impl Plugin for ObservabilityPlugin {
99    fn name(&self) -> &str {
100        "observability"
101    }
102    fn description(&self) -> &str {
103        "Observability service: cost tracking, security monitor, optional REST API"
104    }
105
106    async fn init(
107        &self,
108        _config: HashMap<String, String>,
109        runtime_any: Arc<dyn Any + Send + Sync>,
110    ) -> crate::Result<()> {
111        let cfg = ObservabilityConfig::from_env();
112
113        // Downcast the erased runtime to Arc<RwLock<AgentRuntime>> and fetch adapter without holding lock across await
114        if let Some(rt_arc) = runtime_any.downcast_ref::<Arc<RwLock<AgentRuntime>>>() {
115            let db_opt = {
116                let rt = rt_arc.read().unwrap();
117                rt.get_adapter()
118            };
119            let obs = Observability::new(cfg, db_opt);
120            obs.initialize()
121                .await
122                .map_err(|e| crate::ZoeyError::other(e.to_string()))?;
123        }
124
125        Ok(())
126    }
127}
128
129impl Observability {
130    pub fn set_rate_limit(&self, provider: &str, rl: ProviderRateLimit) {
131        if let Ok(mut m) = self.rate_limits.write() {
132            m.insert(provider.to_string(), rl);
133        }
134    }
135
136    pub fn get_rate_limit_remaining(&self, provider: &str) -> Option<u32> {
137        self.rate_limits
138            .read()
139            .ok()
140            .and_then(|m| m.get(provider).and_then(|rl| rl.remaining))
141    }
142}