zoey_core/observability/
mod.rs1pub mod config;
2pub mod cost_tracker;
3pub mod rest;
4pub mod security_monitor;
5pub mod types;
6
7pub use config::*;
8pub use cost_tracker::{get_global_cost_tracker, set_global_cost_tracker, CostTracker};
9pub use rest::start_rest_api;
10pub use security_monitor::{AlertChannel, AlertSeverity, SecurityAlert, SecurityMonitor};
11pub use types::*;
12
13use crate::error::ZoeyError;
14use crate::types::IDatabaseAdapter;
15use crate::AgentRuntime;
16use crate::Plugin;
17use std::any::Any;
18use std::collections::HashMap;
19use std::sync::{Arc, RwLock};
20
21pub struct Observability {
23 pub config: ObservabilityConfig,
24 pub cost_tracker: Option<Arc<CostTracker>>,
25 pub security_monitor: Option<Arc<SecurityMonitor>>,
26 pub rate_limits: Arc<RwLock<HashMap<String, ProviderRateLimit>>>,
27}
28
29impl Observability {
30 pub fn new(
31 config: ObservabilityConfig,
32 db: Option<Arc<dyn IDatabaseAdapter + Send + Sync>>,
33 ) -> Self {
34 let cost_tracker = Some(Arc::new(CostTracker::new(db.clone())));
36
37 let security_monitor = if config.enabled {
38 Some(Arc::new(SecurityMonitor::new(config.clone())))
39 } else {
40 None
41 };
42
43 Self {
44 config,
45 cost_tracker,
46 security_monitor,
47 rate_limits: Arc::new(RwLock::new(HashMap::new())),
48 }
49 }
50
51 pub async fn initialize(&self) -> Result<(), ZoeyError> {
53 if self.config.rest_api.enabled {
55 let cost_tracker = self.cost_tracker.clone();
56 let rest_config = self.config.rest_api.clone();
57
58 tokio::spawn(async move {
59 if let Err(e) = start_rest_api(rest_config, cost_tracker).await {
60 tracing::error!("Failed to start REST API: {}", e);
61 }
62 });
63
64 tracing::info!(
65 "Observability REST API started on {}:{}",
66 self.config.rest_api.host,
67 self.config.rest_api.port
68 );
69 }
70
71 Ok(())
72 }
73}
74
75use sha2::{Digest, Sha256};
76
77pub fn compute_prompt_preview(s: &str) -> String {
78 s.chars().take(200).collect()
79}
80
81pub fn compute_prompt_hash(s: &str) -> String {
82 let mut hasher = Sha256::new();
83 hasher.update(s.as_bytes());
84 let hash = hasher.finalize();
85 hex::encode(hash)
86}
87
88pub struct ObservabilityPlugin;
90
91impl ObservabilityPlugin {
92 pub fn new() -> Self {
93 Self
94 }
95}
96
97#[async_trait::async_trait]
98impl Plugin for ObservabilityPlugin {
99 fn name(&self) -> &str {
100 "observability"
101 }
102 fn description(&self) -> &str {
103 "Observability service: cost tracking, security monitor, optional REST API"
104 }
105
106 async fn init(
107 &self,
108 _config: HashMap<String, String>,
109 runtime_any: Arc<dyn Any + Send + Sync>,
110 ) -> crate::Result<()> {
111 let cfg = ObservabilityConfig::from_env();
112
113 if let Some(rt_arc) = runtime_any.downcast_ref::<Arc<RwLock<AgentRuntime>>>() {
115 let db_opt = {
116 let rt = rt_arc.read().unwrap();
117 rt.get_adapter()
118 };
119 let obs = Observability::new(cfg, db_opt);
120 obs.initialize()
121 .await
122 .map_err(|e| crate::ZoeyError::other(e.to_string()))?;
123 }
124
125 Ok(())
126 }
127}
128
129impl Observability {
130 pub fn set_rate_limit(&self, provider: &str, rl: ProviderRateLimit) {
131 if let Ok(mut m) = self.rate_limits.write() {
132 m.insert(provider.to_string(), rl);
133 }
134 }
135
136 pub fn get_rate_limit_remaining(&self, provider: &str) -> Option<u32> {
137 self.rate_limits
138 .read()
139 .ok()
140 .and_then(|m| m.get(provider).and_then(|rl| rl.remaining))
141 }
142}