Skip to main content

feagi_services/types/
agent_registry.rs

1// Copyright 2025 Neuraville Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Agent registry types - shared between feagi-services and feagi-io
5
6use serde::{Deserialize, Serialize};
7use std::collections::hash_map::DefaultHasher;
8use std::hash::{Hash, Hasher};
9
10/// Type of agent based on I/O direction and purpose
11#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
12#[serde(rename_all = "lowercase")]
13pub enum AgentType {
14    /// Agent provides sensory input to FEAGI
15    Sensory,
16    /// Agent receives motor output from FEAGI
17    Motor,
18    /// Agent both sends and receives data
19    Both,
20    /// Agent consumes visualization stream only (e.g., Brain Visualizer clients)
21    Visualization,
22    /// Infrastructure agent (e.g., bridges, proxies) - needs viz + control streams
23    Infrastructure,
24}
25
26impl std::fmt::Display for AgentType {
27    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28        match self {
29            AgentType::Sensory => write!(f, "sensory"),
30            AgentType::Motor => write!(f, "motor"),
31            AgentType::Both => write!(f, "both"),
32            AgentType::Visualization => write!(f, "visualization"),
33            AgentType::Infrastructure => write!(f, "infrastructure"),
34        }
35    }
36}
37
38impl std::str::FromStr for AgentType {
39    type Err = String;
40
41    fn from_str(s: &str) -> Result<Self, Self::Err> {
42        match s.to_lowercase().as_str() {
43            "sensory" => Ok(AgentType::Sensory),
44            "motor" => Ok(AgentType::Motor),
45            "both" => Ok(AgentType::Both),
46            "visualization" => Ok(AgentType::Visualization),
47            "infrastructure" => Ok(AgentType::Infrastructure),
48            _ => Err(format!("Invalid agent type: {}", s)),
49        }
50    }
51}
52
53/// Vision input capability
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct VisionCapability {
56    /// Type of vision sensor (camera, lidar, depth, etc.)
57    pub modality: String,
58    /// Frame dimensions [width, height]
59    pub dimensions: (usize, usize),
60    /// Number of channels (1=grayscale, 3=RGB, 4=RGBA)
61    pub channels: usize,
62    /// Target cortical area ID
63    pub target_cortical_area: String,
64    /// Semantic unit identifier (preferred).
65    ///
66    /// When set, the backend can derive cortical IDs without requiring agents to know
67    /// internal 3-letter unit designators (e.g., "svi"/"seg").
68    #[serde(skip_serializing_if = "Option::is_none")]
69    pub unit: Option<SensoryUnit>,
70    /// Cortical unit index (group) for the selected unit (preferred).
71    ///
72    /// FEAGI encodes the group in the cortical ID. This keeps the wire contract
73    /// language-agnostic and avoids leaking internal byte layouts to SDK users.
74    #[serde(skip_serializing_if = "Option::is_none")]
75    pub group: Option<u8>,
76}
77
78/// Motor output capability
79#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct MotorCapability {
81    /// Type of motor (servo, stepper, dc, etc.)
82    pub modality: String,
83    /// Number of motor outputs
84    pub output_count: usize,
85    /// Source cortical area IDs
86    pub source_cortical_areas: Vec<String>,
87    /// Semantic unit identifier (preferred).
88    #[serde(skip_serializing_if = "Option::is_none")]
89    pub unit: Option<MotorUnit>,
90    /// Cortical unit index (group) for the selected unit (preferred).
91    #[serde(skip_serializing_if = "Option::is_none")]
92    pub group: Option<u8>,
93    /// Multiple semantic motor unit sources (preferred for multi-OPU agents).
94    ///
95    /// This supports agents that subscribe to multiple motor cortical unit types
96    /// (e.g., object_segmentation + simple_vision_output + text_english_output).
97    #[serde(skip_serializing_if = "Option::is_none")]
98    pub source_units: Option<Vec<MotorUnitSpec>>,
99}
100
101/// Motor unit + group pair for registration contracts.
102#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
103pub struct MotorUnitSpec {
104    pub unit: MotorUnit,
105    pub group: u8,
106}
107
108/// Language-agnostic sensory unit identifiers for registration contracts.
109///
110/// These are **not** the same as the internal 3-letter unit designators. They are intended
111/// to be stable, human-readable, and suitable for auto-generated SDKs in other languages.
112#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
113#[serde(rename_all = "snake_case")]
114pub enum SensoryUnit {
115    Infrared,
116    Proximity,
117    Shock,
118    Battery,
119    Servo,
120    AnalogGpio,
121    DigitalGpio,
122    MiscData,
123    TextEnglishInput,
124    CountInput,
125    Vision,
126    SegmentedVision,
127    Accelerometer,
128    Gyroscope,
129}
130
131/// Language-agnostic motor unit identifiers for registration contracts.
132#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
133#[serde(rename_all = "snake_case")]
134pub enum MotorUnit {
135    RotaryMotor,
136    PositionalServo,
137    Gaze,
138    MiscData,
139    TextEnglishOutput,
140    CountOutput,
141    ObjectSegmentation,
142    SimpleVisionOutput,
143}
144
145/// Visualization capability for agents that consume neural activity stream
146#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct VisualizationCapability {
148    /// Type of visualization (3d_brain, 2d_plot, neural_graph, etc.)
149    pub visualization_type: String,
150    /// Display resolution if applicable
151    #[serde(skip_serializing_if = "Option::is_none")]
152    pub resolution: Option<(usize, usize)>,
153    /// Refresh rate in Hz if applicable
154    #[serde(skip_serializing_if = "Option::is_none")]
155    pub refresh_rate: Option<f64>,
156    /// Whether this is a bridge/proxy agent (vs direct consumer)
157    #[serde(default)]
158    pub bridge_proxy: bool,
159}
160
161/// Sensory capability for non-vision sensory modalities (text, audio, etc.)
162#[derive(Debug, Clone, Serialize, Deserialize)]
163pub struct SensoryCapability {
164    pub rate_hz: f64,
165    pub shm_path: Option<String>,
166}
167
168/// Agent capabilities describing what data it can provide/consume
169#[derive(Debug, Clone, Default, Serialize, Deserialize)]
170pub struct AgentCapabilities {
171    /// Vision input capability
172    #[serde(skip_serializing_if = "Option::is_none")]
173    pub vision: Option<VisionCapability>,
174
175    /// Motor output capability
176    #[serde(skip_serializing_if = "Option::is_none")]
177    pub motor: Option<MotorCapability>,
178
179    /// Visualization capability
180    #[serde(skip_serializing_if = "Option::is_none")]
181    pub visualization: Option<VisualizationCapability>,
182
183    /// Legacy sensory capability (for backward compatibility)
184    #[serde(skip_serializing_if = "Option::is_none")]
185    pub sensory: Option<SensoryCapability>,
186
187    /// Custom capabilities (extensible for audio, tactile, etc.)
188    #[serde(flatten)]
189    pub custom: serde_json::Map<String, serde_json::Value>,
190}
191
192/// Transport mechanism for agent communication
193#[derive(Debug, Clone, PartialEq, Hash, Serialize, Deserialize)]
194#[serde(rename_all = "lowercase")]
195pub enum AgentTransport {
196    Zmq,
197    Shm,
198    Hybrid, // Uses both ZMQ and SHM
199}
200
201/// Complete agent information
202#[derive(Debug, Clone, Serialize, Deserialize)]
203pub struct AgentInfo {
204    /// Unique agent identifier
205    pub agent_id: String,
206
207    /// Agent type (sensory, motor, or both)
208    pub agent_type: AgentType,
209
210    /// Agent capabilities
211    pub capabilities: AgentCapabilities,
212
213    /// Transport method the agent chose to use
214    pub chosen_transport: Option<String>, // "zmq", "websocket", "shm", or "hybrid"
215
216    /// Registration timestamp (Unix epoch milliseconds)
217    pub registered_at: u64,
218
219    /// Last activity timestamp (Unix epoch milliseconds)
220    pub last_seen: u64,
221
222    /// Transport mechanism
223    pub transport: AgentTransport,
224
225    /// Metadata (client version, hostname, etc.)
226    #[serde(default)]
227    pub metadata: serde_json::Map<String, serde_json::Value>,
228}
229
230impl AgentInfo {
231    /// Create a new agent info with current timestamp
232    pub fn new(
233        agent_id: String,
234        agent_type: AgentType,
235        capabilities: AgentCapabilities,
236        transport: AgentTransport,
237    ) -> Self {
238        let now = std::time::SystemTime::now()
239            .duration_since(std::time::UNIX_EPOCH)
240            .unwrap()
241            .as_millis() as u64;
242
243        Self {
244            agent_id,
245            agent_type,
246            capabilities,
247            chosen_transport: None, // Set later when agent reports back
248            registered_at: now,
249            last_seen: now,
250            transport,
251            metadata: serde_json::Map::new(),
252        }
253    }
254
255    /// Update last_seen timestamp to current time
256    pub fn update_activity(&mut self) {
257        self.last_seen = std::time::SystemTime::now()
258            .duration_since(std::time::UNIX_EPOCH)
259            .unwrap()
260            .as_millis() as u64;
261    }
262
263    /// Check if agent has been inactive for more than timeout_ms
264    pub fn is_inactive(&self, timeout_ms: u64) -> bool {
265        let now = std::time::SystemTime::now()
266            .duration_since(std::time::UNIX_EPOCH)
267            .unwrap()
268            .as_millis() as u64;
269
270        now - self.last_seen > timeout_ms
271    }
272}
273
274/// Agent Registry - single source of truth for agent management
275pub struct AgentRegistry {
276    agents: std::collections::HashMap<String, AgentInfo>,
277    max_agents: usize,
278    timeout_ms: u64,
279}
280
281impl AgentRegistry {
282    /// Create a new agent registry
283    ///
284    /// # Arguments
285    /// * `max_agents` - Maximum number of concurrent agents (default: 100)
286    /// * `timeout_ms` - Inactivity timeout in milliseconds (default: 60000)
287    pub fn new(max_agents: usize, timeout_ms: u64) -> Self {
288        tracing::info!(
289            "🦀 [REGISTRY] Initialized (max_agents={}, timeout_ms={})",
290            max_agents,
291            timeout_ms
292        );
293        Self {
294            agents: std::collections::HashMap::new(),
295            max_agents,
296            timeout_ms,
297        }
298    }
299
300    /// Create registry with default settings (100 agents, 60s timeout)
301    pub fn with_defaults() -> Self {
302        Self::new(100, 60000)
303    }
304
305    /// Register a new agent
306    pub fn register(&mut self, agent_info: AgentInfo) -> Result<(), String> {
307        let agent_id = agent_info.agent_id.clone();
308
309        // Validate configuration
310        self.validate_agent_config(&agent_id, &agent_info.agent_type, &agent_info.capabilities)?;
311
312        // Check if already registered (allow re-registration)
313        let is_reregistration = self.agents.contains_key(&agent_id);
314        if is_reregistration {
315            tracing::warn!(
316                "⚠️ [REGISTRY] Agent re-registering (updating existing entry): {}",
317                agent_id
318            );
319        } else {
320            // Check capacity only for new registrations
321            if self.agents.len() >= self.max_agents {
322                return Err(format!(
323                    "Registry full ({}/{})",
324                    self.agents.len(),
325                    self.max_agents
326                ));
327            }
328        }
329
330        tracing::info!(
331            "🦀 [REGISTRY] Registered agent: {} (type: {}, total: {})",
332            agent_id,
333            agent_info.agent_type,
334            self.agents.len() + if is_reregistration { 0 } else { 1 }
335        );
336        self.agents.insert(agent_id, agent_info);
337        self.refresh_agent_data_hash();
338        Ok(())
339    }
340
341    /// Deregister an agent
342    pub fn deregister(&mut self, agent_id: &str) -> Result<(), String> {
343        if self.agents.remove(agent_id).is_some() {
344            tracing::info!(
345                "🦀 [REGISTRY] Deregistered agent: {} (total: {})",
346                agent_id,
347                self.agents.len()
348            );
349            self.refresh_agent_data_hash();
350            Ok(())
351        } else {
352            Err(format!("Agent {} not found", agent_id))
353        }
354    }
355
356    /// Update heartbeat for an agent
357    pub fn heartbeat(&mut self, agent_id: &str) -> Result<(), String> {
358        use tracing::debug;
359
360        if let Some(agent) = self.agents.get_mut(agent_id) {
361            let old_last_seen = agent.last_seen;
362            agent.update_activity();
363            let new_last_seen = agent.last_seen;
364
365            debug!(
366                "💓 [REGISTRY] Heartbeat updated for '{}': last_seen {} -> {} (+{}ms)",
367                agent_id,
368                old_last_seen,
369                new_last_seen,
370                new_last_seen.saturating_sub(old_last_seen)
371            );
372
373            Ok(())
374        } else {
375            Err(format!("Agent {} not found", agent_id))
376        }
377    }
378
379    /// Get agent info
380    pub fn get(&self, agent_id: &str) -> Option<&AgentInfo> {
381        self.agents.get(agent_id)
382    }
383
384    /// Get all agents
385    pub fn get_all(&self) -> Vec<&AgentInfo> {
386        self.agents.values().collect()
387    }
388
389    /// Get agents with stale heartbeats (older than configured timeout)
390    pub fn get_stale_agents(&self) -> Vec<String> {
391        self.agents
392            .iter()
393            .filter(|(_, info)| info.is_inactive(self.timeout_ms))
394            .map(|(id, _)| id.clone())
395            .collect()
396    }
397
398    /// Prune inactive agents
399    ///
400    /// # Returns
401    /// Number of agents pruned
402    pub fn prune_inactive_agents(&mut self) -> usize {
403        let inactive: Vec<String> = self
404            .agents
405            .iter()
406            .filter(|(_, info)| info.is_inactive(self.timeout_ms))
407            .map(|(id, _)| id.clone())
408            .collect();
409
410        let count = inactive.len();
411        for agent_id in &inactive {
412            self.agents.remove(agent_id);
413            tracing::info!("🦀 [REGISTRY] Pruned inactive agent: {}", agent_id);
414        }
415
416        if count > 0 {
417            tracing::info!(
418                "🦀 [REGISTRY] Pruned {} inactive agents (total: {})",
419                count,
420                self.agents.len()
421            );
422            self.refresh_agent_data_hash();
423        }
424
425        count
426    }
427
428    /// Get number of registered agents
429    pub fn count(&self) -> usize {
430        self.agents.len()
431    }
432
433    fn refresh_agent_data_hash(&self) {
434        #[cfg(feature = "std")]
435        {
436            use feagi_state_manager::StateManager;
437            let mut agent_ids: Vec<&String> = self.agents.keys().collect();
438            agent_ids.sort();
439            let mut hasher = DefaultHasher::new();
440            for agent_id in agent_ids {
441                agent_id.hash(&mut hasher);
442                if let Some(agent) = self.agents.get(agent_id) {
443                    agent.agent_id.hash(&mut hasher);
444                    agent.agent_type.hash(&mut hasher);
445                    agent.transport.hash(&mut hasher);
446                    hash_optional_string(&agent.chosen_transport, &mut hasher);
447                    hash_json_value(
448                        &serde_json::Value::Object(agent.metadata.clone()),
449                        &mut hasher,
450                    );
451                    if let Ok(value) = serde_json::to_value(&agent.capabilities) {
452                        hash_json_value(&value, &mut hasher);
453                    }
454                }
455            }
456            let hash_value = hasher.finish() & AGENT_HASH_SAFE_MASK;
457            if let Some(state_manager) = StateManager::instance().try_write() {
458                state_manager.set_agent_data_hash(hash_value);
459            }
460        }
461    }
462
463    /// Check if any agent has sensory capability (for stream gating)
464    pub fn has_sensory_agents(&self) -> bool {
465        self.agents.values().any(|agent| {
466            agent.capabilities.sensory.is_some() || agent.capabilities.vision.is_some()
467        })
468    }
469
470    /// Check if any agent has motor capability (for stream gating)
471    pub fn has_motor_agents(&self) -> bool {
472        self.agents
473            .values()
474            .any(|agent| agent.capabilities.motor.is_some())
475    }
476
477    /// Check if any agent has visualization capability (for stream gating)
478    pub fn has_visualization_agents(&self) -> bool {
479        self.agents
480            .values()
481            .any(|agent| agent.capabilities.visualization.is_some())
482    }
483
484    /// Get count of agents with sensory capability
485    pub fn count_sensory_agents(&self) -> usize {
486        self.agents
487            .values()
488            .filter(|agent| {
489                agent.capabilities.sensory.is_some() || agent.capabilities.vision.is_some()
490            })
491            .count()
492    }
493
494    /// Get count of agents with motor capability
495    pub fn count_motor_agents(&self) -> usize {
496        self.agents
497            .values()
498            .filter(|agent| agent.capabilities.motor.is_some())
499            .count()
500    }
501
502    /// Get count of agents with visualization capability
503    pub fn count_visualization_agents(&self) -> usize {
504        self.agents
505            .values()
506            .filter(|agent| agent.capabilities.visualization.is_some())
507            .count()
508    }
509
510    /// Get the configured timeout threshold in milliseconds
511    pub fn get_timeout_ms(&self) -> u64 {
512        self.timeout_ms
513    }
514
515    /// Validate agent configuration
516    fn validate_agent_config(
517        &self,
518        agent_id: &str,
519        agent_type: &AgentType,
520        capabilities: &AgentCapabilities,
521    ) -> Result<(), String> {
522        // Agent ID must not be empty
523        if agent_id.is_empty() {
524            return Err("Agent ID cannot be empty".to_string());
525        }
526
527        // Validate capabilities match agent type
528        match agent_type {
529            AgentType::Sensory => {
530                if capabilities.vision.is_none()
531                    && capabilities.sensory.is_none()
532                    && capabilities.custom.is_empty()
533                {
534                    return Err("Sensory agent must have at least one input capability".to_string());
535                }
536            }
537            AgentType::Motor => {
538                if capabilities.motor.is_none() {
539                    return Err("Motor agent must have motor capability".to_string());
540                }
541            }
542            AgentType::Both => {
543                // Both requires at least one capability of each type
544                if (capabilities.vision.is_none()
545                    && capabilities.sensory.is_none()
546                    && capabilities.custom.is_empty())
547                    || capabilities.motor.is_none()
548                {
549                    return Err(
550                        "Bidirectional agent must have both input and output capabilities"
551                            .to_string(),
552                    );
553                }
554            }
555            AgentType::Visualization => {
556                if capabilities.visualization.is_none() {
557                    return Err(
558                        "Visualization agent must have visualization capability".to_string()
559                    );
560                }
561            }
562            AgentType::Infrastructure => {
563                // Infrastructure agents are flexible - they can proxy any type
564                // Just require at least one capability to be declared
565                if capabilities.vision.is_none()
566                    && capabilities.sensory.is_none()
567                    && capabilities.motor.is_none()
568                    && capabilities.visualization.is_none()
569                    && capabilities.custom.is_empty()
570                {
571                    return Err(
572                        "Infrastructure agent must declare at least one capability".to_string()
573                    );
574                }
575            }
576        }
577
578        Ok(())
579    }
580}
581
582const AGENT_HASH_SAFE_MASK: u64 = (1u64 << 53) - 1;
583
584fn hash_optional_string(value: &Option<String>, hasher: &mut DefaultHasher) {
585    match value {
586        Some(text) => {
587            hasher.write_u8(1);
588            text.hash(hasher);
589        }
590        None => {
591            hasher.write_u8(0);
592        }
593    }
594}
595
596fn hash_json_value(value: &serde_json::Value, hasher: &mut DefaultHasher) {
597    match value {
598        serde_json::Value::Null => {
599            hasher.write_u8(0);
600        }
601        serde_json::Value::Bool(val) => {
602            hasher.write_u8(1);
603            hasher.write_u8(*val as u8);
604        }
605        serde_json::Value::Number(num) => {
606            hasher.write_u8(2);
607            num.to_string().hash(hasher);
608        }
609        serde_json::Value::String(text) => {
610            hasher.write_u8(3);
611            text.hash(hasher);
612        }
613        serde_json::Value::Array(values) => {
614            hasher.write_u8(4);
615            for item in values {
616                hash_json_value(item, hasher);
617            }
618        }
619        serde_json::Value::Object(map) => {
620            hasher.write_u8(5);
621            let mut keys: Vec<&String> = map.keys().collect();
622            keys.sort();
623            for key in keys {
624                key.hash(hasher);
625                if let Some(item) = map.get(key) {
626                    hash_json_value(item, hasher);
627                } else {
628                    hasher.write_u8(0);
629                }
630            }
631        }
632    }
633}