agentic_codebase/collective/
registry.rs1use std::collections::HashMap;
7use std::path::PathBuf;
8use std::time::{Duration, Instant};
9
10use serde::{Deserialize, Serialize};
11
12use super::delta::CollectiveDelta;
13use super::patterns::UsagePattern;
14
15const DEFAULT_CACHE_MAINTENANCE_SECS: u64 = 300;
16const DEFAULT_SLA_MAX_REGISTRY_OPS_PER_MIN: u32 = 1200;
17const DEFAULT_HEALTH_LEDGER_EMIT_SECS: u64 = 30;
18
19#[derive(Debug, Clone, Copy)]
20enum AutonomicProfile {
21 Desktop,
22 Cloud,
23 Aggressive,
24}
25
26impl AutonomicProfile {
27 fn from_env(name: &str) -> Self {
28 let raw = read_env_string(name).unwrap_or_else(|| "desktop".to_string());
29 match raw.trim().to_ascii_lowercase().as_str() {
30 "cloud" => Self::Cloud,
31 "aggressive" => Self::Aggressive,
32 _ => Self::Desktop,
33 }
34 }
35
36 fn cache_maintenance_secs(self) -> u64 {
37 match self {
38 Self::Desktop => DEFAULT_CACHE_MAINTENANCE_SECS,
39 Self::Cloud => 120,
40 Self::Aggressive => 60,
41 }
42 }
43
44 fn sla_max_registry_ops_per_min(self) -> u32 {
45 match self {
46 Self::Desktop => DEFAULT_SLA_MAX_REGISTRY_OPS_PER_MIN,
47 Self::Cloud => 4000,
48 Self::Aggressive => 6000,
49 }
50 }
51
52 fn as_str(self) -> &'static str {
53 match self {
54 Self::Desktop => "desktop",
55 Self::Cloud => "cloud",
56 Self::Aggressive => "aggressive",
57 }
58 }
59}
60
61#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
63pub enum RegistryMode {
64 Online,
66 Offline,
68}
69
70impl std::fmt::Display for RegistryMode {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 match self {
73 Self::Online => write!(f, "online"),
74 Self::Offline => write!(f, "offline"),
75 }
76 }
77}
78
79#[derive(Debug, Clone)]
81struct CacheEntry<T> {
82 value: T,
84 inserted_at: Instant,
86 ttl: Duration,
88}
89
90impl<T> CacheEntry<T> {
91 fn is_expired(&self) -> bool {
93 self.inserted_at.elapsed() > self.ttl
94 }
95}
96
97#[derive(Debug)]
99pub struct CollectiveCache {
100 patterns: HashMap<String, CacheEntry<Vec<UsagePattern>>>,
102 default_ttl: Duration,
104}
105
106impl CollectiveCache {
107 pub fn new(default_ttl: Duration) -> Self {
109 Self {
110 patterns: HashMap::new(),
111 default_ttl,
112 }
113 }
114
115 pub fn get_patterns(&self, key: &str) -> Option<&[UsagePattern]> {
117 self.patterns
118 .get(key)
119 .filter(|entry| !entry.is_expired())
120 .map(|entry| entry.value.as_slice())
121 }
122
123 pub fn put_patterns(&mut self, key: String, patterns: Vec<UsagePattern>) {
125 self.patterns.insert(
126 key,
127 CacheEntry {
128 value: patterns,
129 inserted_at: Instant::now(),
130 ttl: self.default_ttl,
131 },
132 );
133 }
134
135 pub fn evict_expired(&mut self) {
137 self.patterns.retain(|_, entry| !entry.is_expired());
138 }
139
140 pub fn clear(&mut self) {
142 self.patterns.clear();
143 }
144
145 pub fn len(&self) -> usize {
147 self.patterns.len()
148 }
149
150 pub fn is_empty(&self) -> bool {
152 self.patterns.is_empty()
153 }
154}
155
156impl Default for CollectiveCache {
157 fn default() -> Self {
158 Self::new(Duration::from_secs(300))
159 }
160}
161
162#[derive(Debug)]
168pub struct RegistryClient {
169 mode: RegistryMode,
171 cache: CollectiveCache,
173 endpoint: Option<String>,
175 last_cache_maintenance: Instant,
177 cache_maintenance_interval: Duration,
179 profile: AutonomicProfile,
181 sla_max_registry_ops_per_min: u32,
183 ops_window_started: Instant,
185 ops_window_count: u32,
187 cache_maintenance_throttle_count: u64,
189 last_health_ledger_emit: Instant,
191 health_ledger_emit_interval: Duration,
193}
194
195impl RegistryClient {
196 pub fn offline() -> Self {
198 let profile = AutonomicProfile::from_env("ACB_AUTONOMIC_PROFILE");
199 let cache_maintenance_interval = Duration::from_secs(read_env_u64(
200 "ACB_COLLECTIVE_CACHE_MAINTENANCE_SECS",
201 profile.cache_maintenance_secs(),
202 ));
203 let health_ledger_emit_interval = Duration::from_secs(
204 read_env_u64(
205 "ACB_HEALTH_LEDGER_EMIT_SECS",
206 DEFAULT_HEALTH_LEDGER_EMIT_SECS,
207 )
208 .max(5),
209 );
210 Self {
211 mode: RegistryMode::Offline,
212 cache: CollectiveCache::default(),
213 endpoint: None,
214 last_cache_maintenance: Instant::now(),
215 cache_maintenance_interval,
216 profile,
217 sla_max_registry_ops_per_min: read_env_u32(
218 "ACB_SLA_MAX_REGISTRY_OPS_PER_MIN",
219 profile.sla_max_registry_ops_per_min(),
220 )
221 .max(1),
222 ops_window_started: Instant::now(),
223 ops_window_count: 0,
224 cache_maintenance_throttle_count: 0,
225 last_health_ledger_emit: Instant::now()
226 .checked_sub(health_ledger_emit_interval)
227 .unwrap_or_else(Instant::now),
228 health_ledger_emit_interval,
229 }
230 }
231
232 pub fn online(endpoint: String) -> Self {
237 let profile = AutonomicProfile::from_env("ACB_AUTONOMIC_PROFILE");
238 let cache_maintenance_interval = Duration::from_secs(read_env_u64(
239 "ACB_COLLECTIVE_CACHE_MAINTENANCE_SECS",
240 profile.cache_maintenance_secs(),
241 ));
242 let health_ledger_emit_interval = Duration::from_secs(
243 read_env_u64(
244 "ACB_HEALTH_LEDGER_EMIT_SECS",
245 DEFAULT_HEALTH_LEDGER_EMIT_SECS,
246 )
247 .max(5),
248 );
249 Self {
250 mode: RegistryMode::Online,
251 cache: CollectiveCache::default(),
252 endpoint: Some(endpoint),
253 last_cache_maintenance: Instant::now(),
254 cache_maintenance_interval,
255 profile,
256 sla_max_registry_ops_per_min: read_env_u32(
257 "ACB_SLA_MAX_REGISTRY_OPS_PER_MIN",
258 profile.sla_max_registry_ops_per_min(),
259 )
260 .max(1),
261 ops_window_started: Instant::now(),
262 ops_window_count: 0,
263 cache_maintenance_throttle_count: 0,
264 last_health_ledger_emit: Instant::now()
265 .checked_sub(health_ledger_emit_interval)
266 .unwrap_or_else(Instant::now),
267 health_ledger_emit_interval,
268 }
269 }
270
271 pub fn mode(&self) -> &RegistryMode {
273 &self.mode
274 }
275
276 pub fn endpoint(&self) -> Option<&str> {
278 self.endpoint.as_deref()
279 }
280
281 pub fn query_patterns(&mut self, language: &str, category: &str) -> Vec<UsagePattern> {
286 self.record_operation();
287 self.maybe_run_cache_maintenance();
288 let cache_key = format!("{}:{}", language, category);
289
290 if let Some(cached) = self.cache.get_patterns(&cache_key) {
292 return cached.to_vec();
293 }
294
295 match self.mode {
296 RegistryMode::Offline => {
297 tracing::debug!(
298 "Registry in offline mode; returning empty patterns for {}:{}.",
299 language,
300 category
301 );
302 Vec::new()
303 }
304 RegistryMode::Online => {
305 tracing::debug!(
307 "Registry online query for {}:{} (not yet implemented).",
308 language,
309 category
310 );
311 Vec::new()
312 }
313 }
314 }
315
316 pub fn publish_delta(&mut self, _delta: &CollectiveDelta) -> bool {
321 self.record_operation();
322 self.maybe_run_cache_maintenance();
323 match self.mode {
324 RegistryMode::Offline => {
325 tracing::debug!("Registry in offline mode; delta silently dropped.");
326 true
327 }
328 RegistryMode::Online => {
329 tracing::debug!("Registry publish (not yet implemented).");
330 true
331 }
332 }
333 }
334
335 pub fn cache(&self) -> &CollectiveCache {
337 &self.cache
338 }
339
340 pub fn cache_mut(&mut self) -> &mut CollectiveCache {
342 &mut self.cache
343 }
344
345 pub fn maybe_run_cache_maintenance(&mut self) {
347 if self.last_cache_maintenance.elapsed() < self.cache_maintenance_interval {
348 return;
349 }
350 if self.should_throttle_maintenance() {
351 self.cache_maintenance_throttle_count =
352 self.cache_maintenance_throttle_count.saturating_add(1);
353 self.last_cache_maintenance = Instant::now();
354 self.emit_health_ledger("throttled", 0);
355 tracing::debug!(
356 "collective cache maintenance throttled: ops_per_min={} threshold={}",
357 self.registry_ops_per_min(),
358 self.sla_max_registry_ops_per_min
359 );
360 return;
361 }
362
363 let before = self.cache.len();
364 self.cache.evict_expired();
365 let after = self.cache.len();
366 self.last_cache_maintenance = Instant::now();
367 let evicted = before.saturating_sub(after);
368 self.emit_health_ledger("normal", evicted);
369
370 if after < before {
371 tracing::debug!(
372 "collective cache maintenance evicted {} expired entries",
373 evicted
374 );
375 }
376 }
377
378 fn record_operation(&mut self) {
379 if self.ops_window_started.elapsed() >= Duration::from_secs(60) {
380 self.ops_window_started = Instant::now();
381 self.ops_window_count = 0;
382 }
383 self.ops_window_count = self.ops_window_count.saturating_add(1);
384 }
385
386 fn registry_ops_per_min(&self) -> u32 {
387 let elapsed = self.ops_window_started.elapsed().as_secs().max(1);
388 let scaled = (self.ops_window_count as u64)
389 .saturating_mul(60)
390 .saturating_div(elapsed);
391 scaled.min(u32::MAX as u64) as u32
392 }
393
394 fn should_throttle_maintenance(&self) -> bool {
395 self.registry_ops_per_min() > self.sla_max_registry_ops_per_min
396 }
397
398 fn emit_health_ledger(&mut self, maintenance_mode: &str, evicted: usize) {
399 if self.last_health_ledger_emit.elapsed() < self.health_ledger_emit_interval {
400 return;
401 }
402
403 let dir = resolve_health_ledger_dir();
404 if std::fs::create_dir_all(&dir).is_err() {
405 return;
406 }
407 let path = dir.join("agentic-codebase.json");
408 let tmp = dir.join("agentic-codebase.json.tmp");
409 let payload = serde_json::json!({
410 "project": "AgenticCodebase",
411 "timestamp": chrono::Utc::now().to_rfc3339(),
412 "status": "ok",
413 "autonomic": {
414 "profile": self.profile.as_str(),
415 "mode": self.mode.to_string(),
416 "maintenance_mode": maintenance_mode,
417 "cache_maintenance_secs": self.cache_maintenance_interval.as_secs(),
418 "throttle_count": self.cache_maintenance_throttle_count,
419 },
420 "sla": {
421 "registry_ops_per_min": self.registry_ops_per_min(),
422 "max_registry_ops_per_min": self.sla_max_registry_ops_per_min
423 },
424 "cache": {
425 "entries": self.cache.len(),
426 "evicted": evicted
427 },
428 });
429 let Ok(bytes) = serde_json::to_vec_pretty(&payload) else {
430 return;
431 };
432 if std::fs::write(&tmp, bytes).is_err() {
433 return;
434 }
435 if std::fs::rename(&tmp, &path).is_err() {
436 return;
437 }
438 self.last_health_ledger_emit = Instant::now();
439 }
440}
441
442fn read_env_u64(name: &str, default_value: u64) -> u64 {
443 std::env::var(name)
444 .ok()
445 .and_then(|v| v.parse::<u64>().ok())
446 .unwrap_or(default_value)
447}
448
449fn read_env_u32(name: &str, default_value: u32) -> u32 {
450 std::env::var(name)
451 .ok()
452 .and_then(|v| v.parse::<u32>().ok())
453 .unwrap_or(default_value)
454}
455
456fn read_env_string(name: &str) -> Option<String> {
457 std::env::var(name).ok().map(|v| v.trim().to_string())
458}
459
460fn resolve_health_ledger_dir() -> PathBuf {
461 if let Some(custom) = read_env_string("ACB_HEALTH_LEDGER_DIR") {
462 if !custom.is_empty() {
463 return PathBuf::from(custom);
464 }
465 }
466 if let Some(custom) = read_env_string("AGENTRA_HEALTH_LEDGER_DIR") {
467 if !custom.is_empty() {
468 return PathBuf::from(custom);
469 }
470 }
471
472 let home = std::env::var("HOME")
473 .ok()
474 .map(PathBuf::from)
475 .unwrap_or_else(|| PathBuf::from("."));
476 home.join(".agentra").join("health-ledger")
477}