Skip to main content

agentic_codebase/collective/
registry.rs

1//! Collective registry client.
2//!
3//! Provides a registry client for querying and publishing collective
4//! intelligence data. Works in offline mode when no network is available.
5
6use std::collections::HashMap;
7use std::path::PathBuf;
8use std::time::{Duration, Instant};
9
10use serde::{Deserialize, Serialize};
11
12use super::delta::CollectiveDelta;
13use super::patterns::UsagePattern;
14
15const DEFAULT_CACHE_MAINTENANCE_SECS: u64 = 300;
16const DEFAULT_SLA_MAX_REGISTRY_OPS_PER_MIN: u32 = 1200;
17const DEFAULT_HEALTH_LEDGER_EMIT_SECS: u64 = 30;
18
19#[derive(Debug, Clone, Copy)]
20enum AutonomicProfile {
21    Desktop,
22    Cloud,
23    Aggressive,
24}
25
26impl AutonomicProfile {
27    fn from_env(name: &str) -> Self {
28        let raw = read_env_string(name).unwrap_or_else(|| "desktop".to_string());
29        match raw.trim().to_ascii_lowercase().as_str() {
30            "cloud" => Self::Cloud,
31            "aggressive" => Self::Aggressive,
32            _ => Self::Desktop,
33        }
34    }
35
36    fn cache_maintenance_secs(self) -> u64 {
37        match self {
38            Self::Desktop => DEFAULT_CACHE_MAINTENANCE_SECS,
39            Self::Cloud => 120,
40            Self::Aggressive => 60,
41        }
42    }
43
44    fn sla_max_registry_ops_per_min(self) -> u32 {
45        match self {
46            Self::Desktop => DEFAULT_SLA_MAX_REGISTRY_OPS_PER_MIN,
47            Self::Cloud => 4000,
48            Self::Aggressive => 6000,
49        }
50    }
51
52    fn as_str(self) -> &'static str {
53        match self {
54            Self::Desktop => "desktop",
55            Self::Cloud => "cloud",
56            Self::Aggressive => "aggressive",
57        }
58    }
59}
60
61/// Operating mode for the registry client.
62#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
63pub enum RegistryMode {
64    /// Fully online — sync with remote registry.
65    Online,
66    /// Offline — use local cache only, no network.
67    Offline,
68}
69
70impl std::fmt::Display for RegistryMode {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        match self {
73            Self::Online => write!(f, "online"),
74            Self::Offline => write!(f, "offline"),
75        }
76    }
77}
78
79/// A cached entry with TTL.
80#[derive(Debug, Clone)]
81struct CacheEntry<T> {
82    /// The cached value.
83    value: T,
84    /// When this entry was inserted.
85    inserted_at: Instant,
86    /// Time-to-live for this entry.
87    ttl: Duration,
88}
89
90impl<T> CacheEntry<T> {
91    /// Check if this entry has expired.
92    fn is_expired(&self) -> bool {
93        self.inserted_at.elapsed() > self.ttl
94    }
95}
96
97/// A TTL-based cache for collective data.
98#[derive(Debug)]
99pub struct CollectiveCache {
100    /// Cached patterns indexed by query key.
101    patterns: HashMap<String, CacheEntry<Vec<UsagePattern>>>,
102    /// Default TTL for cache entries.
103    default_ttl: Duration,
104}
105
106impl CollectiveCache {
107    /// Create a new cache with a default TTL.
108    pub fn new(default_ttl: Duration) -> Self {
109        Self {
110            patterns: HashMap::new(),
111            default_ttl,
112        }
113    }
114
115    /// Get cached patterns for a query key, if not expired.
116    pub fn get_patterns(&self, key: &str) -> Option<&[UsagePattern]> {
117        self.patterns
118            .get(key)
119            .filter(|entry| !entry.is_expired())
120            .map(|entry| entry.value.as_slice())
121    }
122
123    /// Store patterns in the cache.
124    pub fn put_patterns(&mut self, key: String, patterns: Vec<UsagePattern>) {
125        self.patterns.insert(
126            key,
127            CacheEntry {
128                value: patterns,
129                inserted_at: Instant::now(),
130                ttl: self.default_ttl,
131            },
132        );
133    }
134
135    /// Remove expired entries from the cache.
136    pub fn evict_expired(&mut self) {
137        self.patterns.retain(|_, entry| !entry.is_expired());
138    }
139
140    /// Clear all entries from the cache.
141    pub fn clear(&mut self) {
142        self.patterns.clear();
143    }
144
145    /// Get the number of (possibly expired) entries in the cache.
146    pub fn len(&self) -> usize {
147        self.patterns.len()
148    }
149
150    /// Check if the cache is empty.
151    pub fn is_empty(&self) -> bool {
152        self.patterns.is_empty()
153    }
154}
155
156impl Default for CollectiveCache {
157    fn default() -> Self {
158        Self::new(Duration::from_secs(300))
159    }
160}
161
162/// Registry client for collective intelligence data.
163///
164/// In offline mode, all queries return empty results and publish operations
165/// are silently dropped. In online mode (not yet implemented), the client
166/// would communicate with a remote registry server.
167#[derive(Debug)]
168pub struct RegistryClient {
169    /// Operating mode.
170    mode: RegistryMode,
171    /// Local cache.
172    cache: CollectiveCache,
173    /// Registry endpoint URL (used in online mode).
174    endpoint: Option<String>,
175    /// Last time periodic cache maintenance ran.
176    last_cache_maintenance: Instant,
177    /// Interval between periodic cache maintenance runs.
178    cache_maintenance_interval: Duration,
179    /// Current autonomic profile.
180    profile: AutonomicProfile,
181    /// SLA threshold for maintenance throttling.
182    sla_max_registry_ops_per_min: u32,
183    /// Start of current operation-rate window.
184    ops_window_started: Instant,
185    /// Number of operations in current window.
186    ops_window_count: u32,
187    /// Number of times maintenance has been throttled.
188    cache_maintenance_throttle_count: u64,
189    /// Last time a health-ledger snapshot was emitted.
190    last_health_ledger_emit: Instant,
191    /// Minimum interval between health-ledger snapshots.
192    health_ledger_emit_interval: Duration,
193}
194
195impl RegistryClient {
196    /// Create a new registry client in offline mode.
197    pub fn offline() -> Self {
198        let profile = AutonomicProfile::from_env("ACB_AUTONOMIC_PROFILE");
199        let cache_maintenance_interval = Duration::from_secs(read_env_u64(
200            "ACB_COLLECTIVE_CACHE_MAINTENANCE_SECS",
201            profile.cache_maintenance_secs(),
202        ));
203        let health_ledger_emit_interval = Duration::from_secs(
204            read_env_u64(
205                "ACB_HEALTH_LEDGER_EMIT_SECS",
206                DEFAULT_HEALTH_LEDGER_EMIT_SECS,
207            )
208            .max(5),
209        );
210        Self {
211            mode: RegistryMode::Offline,
212            cache: CollectiveCache::default(),
213            endpoint: None,
214            last_cache_maintenance: Instant::now(),
215            cache_maintenance_interval,
216            profile,
217            sla_max_registry_ops_per_min: read_env_u32(
218                "ACB_SLA_MAX_REGISTRY_OPS_PER_MIN",
219                profile.sla_max_registry_ops_per_min(),
220            )
221            .max(1),
222            ops_window_started: Instant::now(),
223            ops_window_count: 0,
224            cache_maintenance_throttle_count: 0,
225            last_health_ledger_emit: Instant::now()
226                .checked_sub(health_ledger_emit_interval)
227                .unwrap_or_else(Instant::now),
228            health_ledger_emit_interval,
229        }
230    }
231
232    /// Create a new registry client in online mode (stub).
233    ///
234    /// Note: Online mode is not yet implemented. The client will still
235    /// behave as offline but will store the endpoint for future use.
236    pub fn online(endpoint: String) -> Self {
237        let profile = AutonomicProfile::from_env("ACB_AUTONOMIC_PROFILE");
238        let cache_maintenance_interval = Duration::from_secs(read_env_u64(
239            "ACB_COLLECTIVE_CACHE_MAINTENANCE_SECS",
240            profile.cache_maintenance_secs(),
241        ));
242        let health_ledger_emit_interval = Duration::from_secs(
243            read_env_u64(
244                "ACB_HEALTH_LEDGER_EMIT_SECS",
245                DEFAULT_HEALTH_LEDGER_EMIT_SECS,
246            )
247            .max(5),
248        );
249        Self {
250            mode: RegistryMode::Online,
251            cache: CollectiveCache::default(),
252            endpoint: Some(endpoint),
253            last_cache_maintenance: Instant::now(),
254            cache_maintenance_interval,
255            profile,
256            sla_max_registry_ops_per_min: read_env_u32(
257                "ACB_SLA_MAX_REGISTRY_OPS_PER_MIN",
258                profile.sla_max_registry_ops_per_min(),
259            )
260            .max(1),
261            ops_window_started: Instant::now(),
262            ops_window_count: 0,
263            cache_maintenance_throttle_count: 0,
264            last_health_ledger_emit: Instant::now()
265                .checked_sub(health_ledger_emit_interval)
266                .unwrap_or_else(Instant::now),
267            health_ledger_emit_interval,
268        }
269    }
270
271    /// Get the current operating mode.
272    pub fn mode(&self) -> &RegistryMode {
273        &self.mode
274    }
275
276    /// Get the endpoint URL, if configured.
277    pub fn endpoint(&self) -> Option<&str> {
278        self.endpoint.as_deref()
279    }
280
281    /// Query patterns from the registry.
282    ///
283    /// In offline mode, always returns an empty list.
284    /// Checks cache first before making any (future) network calls.
285    pub fn query_patterns(&mut self, language: &str, category: &str) -> Vec<UsagePattern> {
286        self.record_operation();
287        self.maybe_run_cache_maintenance();
288        let cache_key = format!("{}:{}", language, category);
289
290        // Check cache first.
291        if let Some(cached) = self.cache.get_patterns(&cache_key) {
292            return cached.to_vec();
293        }
294
295        match self.mode {
296            RegistryMode::Offline => {
297                tracing::debug!(
298                    "Registry in offline mode; returning empty patterns for {}:{}.",
299                    language,
300                    category
301                );
302                Vec::new()
303            }
304            RegistryMode::Online => {
305                // Online mode is a stub: log and return empty.
306                tracing::debug!(
307                    "Registry online query for {}:{} (not yet implemented).",
308                    language,
309                    category
310                );
311                Vec::new()
312            }
313        }
314    }
315
316    /// Publish a delta to the registry.
317    ///
318    /// In offline mode, the delta is silently dropped.
319    /// Returns true if the delta was accepted (or dropped in offline mode).
320    pub fn publish_delta(&mut self, _delta: &CollectiveDelta) -> bool {
321        self.record_operation();
322        self.maybe_run_cache_maintenance();
323        match self.mode {
324            RegistryMode::Offline => {
325                tracing::debug!("Registry in offline mode; delta silently dropped.");
326                true
327            }
328            RegistryMode::Online => {
329                tracing::debug!("Registry publish (not yet implemented).");
330                true
331            }
332        }
333    }
334
335    /// Access the internal cache.
336    pub fn cache(&self) -> &CollectiveCache {
337        &self.cache
338    }
339
340    /// Access the internal cache mutably.
341    pub fn cache_mut(&mut self) -> &mut CollectiveCache {
342        &mut self.cache
343    }
344
345    /// Run cache maintenance if the maintenance interval has elapsed.
346    pub fn maybe_run_cache_maintenance(&mut self) {
347        if self.last_cache_maintenance.elapsed() < self.cache_maintenance_interval {
348            return;
349        }
350        if self.should_throttle_maintenance() {
351            self.cache_maintenance_throttle_count =
352                self.cache_maintenance_throttle_count.saturating_add(1);
353            self.last_cache_maintenance = Instant::now();
354            self.emit_health_ledger("throttled", 0);
355            tracing::debug!(
356                "collective cache maintenance throttled: ops_per_min={} threshold={}",
357                self.registry_ops_per_min(),
358                self.sla_max_registry_ops_per_min
359            );
360            return;
361        }
362
363        let before = self.cache.len();
364        self.cache.evict_expired();
365        let after = self.cache.len();
366        self.last_cache_maintenance = Instant::now();
367        let evicted = before.saturating_sub(after);
368        self.emit_health_ledger("normal", evicted);
369
370        if after < before {
371            tracing::debug!(
372                "collective cache maintenance evicted {} expired entries",
373                evicted
374            );
375        }
376    }
377
378    fn record_operation(&mut self) {
379        if self.ops_window_started.elapsed() >= Duration::from_secs(60) {
380            self.ops_window_started = Instant::now();
381            self.ops_window_count = 0;
382        }
383        self.ops_window_count = self.ops_window_count.saturating_add(1);
384    }
385
386    fn registry_ops_per_min(&self) -> u32 {
387        let elapsed = self.ops_window_started.elapsed().as_secs().max(1);
388        let scaled = (self.ops_window_count as u64)
389            .saturating_mul(60)
390            .saturating_div(elapsed);
391        scaled.min(u32::MAX as u64) as u32
392    }
393
394    fn should_throttle_maintenance(&self) -> bool {
395        self.registry_ops_per_min() > self.sla_max_registry_ops_per_min
396    }
397
398    fn emit_health_ledger(&mut self, maintenance_mode: &str, evicted: usize) {
399        if self.last_health_ledger_emit.elapsed() < self.health_ledger_emit_interval {
400            return;
401        }
402
403        let dir = resolve_health_ledger_dir();
404        if std::fs::create_dir_all(&dir).is_err() {
405            return;
406        }
407        let path = dir.join("agentic-codebase.json");
408        let tmp = dir.join("agentic-codebase.json.tmp");
409        let payload = serde_json::json!({
410            "project": "AgenticCodebase",
411            "timestamp": chrono::Utc::now().to_rfc3339(),
412            "status": "ok",
413            "autonomic": {
414                "profile": self.profile.as_str(),
415                "mode": self.mode.to_string(),
416                "maintenance_mode": maintenance_mode,
417                "cache_maintenance_secs": self.cache_maintenance_interval.as_secs(),
418                "throttle_count": self.cache_maintenance_throttle_count,
419            },
420            "sla": {
421                "registry_ops_per_min": self.registry_ops_per_min(),
422                "max_registry_ops_per_min": self.sla_max_registry_ops_per_min
423            },
424            "cache": {
425                "entries": self.cache.len(),
426                "evicted": evicted
427            },
428        });
429        let Ok(bytes) = serde_json::to_vec_pretty(&payload) else {
430            return;
431        };
432        if std::fs::write(&tmp, bytes).is_err() {
433            return;
434        }
435        if std::fs::rename(&tmp, &path).is_err() {
436            return;
437        }
438        self.last_health_ledger_emit = Instant::now();
439    }
440}
441
442fn read_env_u64(name: &str, default_value: u64) -> u64 {
443    std::env::var(name)
444        .ok()
445        .and_then(|v| v.parse::<u64>().ok())
446        .unwrap_or(default_value)
447}
448
449fn read_env_u32(name: &str, default_value: u32) -> u32 {
450    std::env::var(name)
451        .ok()
452        .and_then(|v| v.parse::<u32>().ok())
453        .unwrap_or(default_value)
454}
455
456fn read_env_string(name: &str) -> Option<String> {
457    std::env::var(name).ok().map(|v| v.trim().to_string())
458}
459
460fn resolve_health_ledger_dir() -> PathBuf {
461    if let Some(custom) = read_env_string("ACB_HEALTH_LEDGER_DIR") {
462        if !custom.is_empty() {
463            return PathBuf::from(custom);
464        }
465    }
466    if let Some(custom) = read_env_string("AGENTRA_HEALTH_LEDGER_DIR") {
467        if !custom.is_empty() {
468            return PathBuf::from(custom);
469        }
470    }
471
472    let home = std::env::var("HOME")
473        .ok()
474        .map(PathBuf::from)
475        .unwrap_or_else(|| PathBuf::from("."));
476    home.join(".agentra").join("health-ledger")
477}