Skip to main content

feagi_services/impls/
agent_service_impl.rs

1// Copyright 2025 Neuraville Inc.
2// Licensed under the Apache License, Version 2.0
3
4//! Agent service implementation
5//!
6//! This implementation delegates all agent management to the PNS AgentRegistry,
7//! ensuring centralized coordination consistent with the Python implementation.
8
9use async_trait::async_trait;
10use parking_lot::RwLock;
11use std::collections::HashMap;
12use std::sync::Arc;
13use tracing::{error, info, warn};
14
15use crate::traits::agent_service::*;
16use crate::traits::registration_handler::RegistrationHandlerTrait;
17use crate::traits::RuntimeService as RuntimeServiceTrait;
18use crate::types::agent_registry::AgentRegistry;
19use crate::types::registration::RegistrationRequest;
20use feagi_brain_development::ConnectomeManager;
21use feagi_structures::genomic::cortical_area::CorticalID;
22
23/// Implementation of the Agent service
24pub struct AgentServiceImpl {
25    connectome_manager: Arc<RwLock<ConnectomeManager>>,
26    agent_registry: Arc<RwLock<AgentRegistry>>,
27    registration_handler: Option<Arc<dyn RegistrationHandlerTrait>>,
28    runtime_service: Arc<RwLock<Option<Arc<dyn RuntimeServiceTrait + Send + Sync>>>>,
29}
30
31impl AgentServiceImpl {
32    pub fn new(
33        connectome_manager: Arc<RwLock<ConnectomeManager>>,
34        agent_registry: Arc<RwLock<AgentRegistry>>,
35    ) -> Self {
36        Self {
37            connectome_manager,
38            agent_registry,
39            registration_handler: None,
40            runtime_service: Arc::new(RwLock::new(None)),
41        }
42    }
43
44    /// Create AgentServiceImpl with runtime service
45    pub fn new_with_runtime(
46        connectome_manager: Arc<RwLock<ConnectomeManager>>,
47        agent_registry: Arc<RwLock<AgentRegistry>>,
48        runtime_service: Arc<dyn RuntimeServiceTrait + Send + Sync>,
49    ) -> Self {
50        Self {
51            connectome_manager,
52            agent_registry,
53            registration_handler: None,
54            runtime_service: Arc::new(RwLock::new(Some(runtime_service))),
55        }
56    }
57
58    /// Set the PNS registration handler for full transport negotiation
59    /// Set registration handler (accepts trait object to break circular dependency)
60    pub fn set_registration_handler(&mut self, handler: Arc<dyn RegistrationHandlerTrait>) {
61        self.registration_handler = Some(handler);
62        info!("🦀 [AGENT-SERVICE] Registration handler connected");
63    }
64
65    /// Set the runtime service for sensory injection (thread-safe, can be called after Arc wrapping)
66    pub fn set_runtime_service(&self, runtime_service: Arc<dyn RuntimeServiceTrait + Send + Sync>) {
67        *self.runtime_service.write() = Some(runtime_service);
68        info!("🦀 [AGENT-SERVICE] Runtime service connected");
69    }
70}
71
72#[async_trait]
73impl AgentService for AgentServiceImpl {
74    async fn register_agent(
75        &self,
76        registration: AgentRegistration,
77    ) -> AgentResult<AgentRegistrationResponse> {
78        info!(
79            "🦀 [AGENT-SERVICE] Registering agent: {} (type: {})",
80            registration.agent_id, registration.agent_type
81        );
82
83        // If we have a registration handler, use it (gets full transport info)
84        if let Some(handler) = &self.registration_handler {
85            info!(
86                "📝 [AGENT-SERVICE] Using PNS registration handler for full transport negotiation"
87            );
88
89            // Build PNS registration request
90            let pns_request = RegistrationRequest {
91                agent_id: registration.agent_id.clone(),
92                agent_type: registration.agent_type.clone(),
93                capabilities: serde_json::to_value(&registration.capabilities)
94                    .unwrap_or(serde_json::Value::Object(serde_json::Map::new())),
95                chosen_transport: registration.chosen_transport.clone(), // Pass through the agent's transport choice
96            };
97
98            // Call registration handler - use spawn_blocking to avoid blocking the async runtime
99            // The process_registration method is sync and may block, so we offload it to a blocking thread pool
100            let handler_clone = handler.clone();
101            let pns_response = tokio::task::spawn_blocking(move || {
102                handler_clone.process_registration(pns_request)
103            })
104            .await
105            .map_err(|e| {
106                AgentError::RegistrationFailed(format!("Registration task panicked: {:?}", e))
107            })?
108            .map_err(AgentError::RegistrationFailed)?;
109
110            // Convert PNS transport configs to service transport configs
111            let transports = pns_response.transports.map(|ts| {
112                ts.into_iter()
113                    .map(|t| TransportConfig {
114                        transport_type: t.transport_type,
115                        enabled: t.enabled,
116                        ports: t.ports,
117                        host: t.host,
118                    })
119                    .collect()
120            });
121
122            // Serialize cortical areas with proper error handling (don't panic!)
123            let cortical_areas_json =
124                serde_json::to_value(&pns_response.cortical_areas).map_err(|e| {
125                    error!(
126                        "❌ [AGENT-SERVICE] Failed to serialize cortical areas: {}",
127                        e
128                    );
129                    AgentError::Internal(format!("Failed to serialize cortical areas: {}", e))
130                })?;
131
132            return Ok(AgentRegistrationResponse {
133                status: pns_response.status,
134                message: pns_response
135                    .message
136                    .unwrap_or_else(|| "Success".to_string()),
137                success: true,
138                transport: None,
139                rates: None,
140                transports,
141                recommended_transport: pns_response.recommended_transport,
142                shm_paths: pns_response.shm_paths,
143                cortical_areas: cortical_areas_json,
144            });
145        }
146
147        return Err(AgentError::RegistrationFailed(
148            "Registration handler not available - required for FEAGI 2.0".to_string(),
149        ));
150    }
151
152    async fn heartbeat(&self, request: HeartbeatRequest) -> AgentResult<()> {
153        // Check if agent exists before attempting heartbeat
154        let agent_exists = {
155            let registry = self.agent_registry.read();
156            registry.get(&request.agent_id).is_some()
157        };
158
159        if !agent_exists {
160            // Log diagnostic information when agent not found
161            let all_agents: Vec<String> = {
162                let registry = self.agent_registry.read();
163                registry
164                    .get_all()
165                    .iter()
166                    .map(|a| a.agent_id.clone())
167                    .collect()
168            };
169            warn!(
170                "⚠️ [HEARTBEAT] Agent '{}' not found in registry. Registered agents ({}): {:?}",
171                request.agent_id,
172                all_agents.len(),
173                all_agents
174            );
175            return Err(AgentError::NotFound(format!(
176                "Agent {} not found in registry (total registered: {})",
177                request.agent_id,
178                all_agents.len()
179            )));
180        }
181
182        // Agent exists - update heartbeat
183        self.agent_registry
184            .write()
185            .heartbeat(&request.agent_id)
186            .map_err(|e| {
187                error!(
188                    "❌ [HEARTBEAT] Failed to update heartbeat for '{}': {}",
189                    request.agent_id, e
190                );
191                AgentError::NotFound(e)
192            })?;
193        Ok(())
194    }
195
196    async fn list_agents(&self) -> AgentResult<Vec<String>> {
197        tracing::trace!(target: "feagi-services", "list_agents() called - acquiring registry read lock...");
198        let registry = self.agent_registry.read();
199        let agents = registry.get_all();
200        let agent_ids: Vec<String> = agents.iter().map(|a| a.agent_id.clone()).collect();
201
202        tracing::trace!(
203            target: "feagi-services",
204            "list_agents() found {} agents: {:?}",
205            agent_ids.len(),
206            agent_ids
207        );
208        tracing::trace!(
209            target: "feagi-services",
210            "AgentRegistry pointer: {:p}",
211            &*self.agent_registry as *const _
212        );
213
214        Ok(agent_ids)
215    }
216
217    async fn get_agent_properties(&self, agent_id: &str) -> AgentResult<AgentProperties> {
218        let registry = self.agent_registry.read();
219        let agent = registry
220            .get(agent_id)
221            .ok_or_else(|| AgentError::NotFound(format!("Agent {} not found", agent_id)))?;
222
223        // Extract properties from agent info
224        let agent_ip = agent
225            .metadata
226            .get("agent_ip")
227            .and_then(|v| v.as_str())
228            .unwrap_or("127.0.0.1")
229            .to_string();
230
231        let agent_data_port = agent
232            .metadata
233            .get("agent_data_port")
234            .and_then(|v| v.as_u64())
235            .unwrap_or(0) as u16;
236
237        let agent_version = agent
238            .metadata
239            .get("agent_version")
240            .and_then(|v| v.as_str())
241            .unwrap_or("unknown")
242            .to_string();
243
244        let controller_version = agent
245            .metadata
246            .get("controller_version")
247            .and_then(|v| v.as_str())
248            .unwrap_or("unknown")
249            .to_string();
250
251        let agent_router_address = format!("tcp://{}:{}", agent_ip, agent_data_port);
252
253        // Build full capabilities map using feagi-sensorimotor format: {"input": [...], "output": [...]}
254        let mut capabilities = HashMap::new();
255
256        // Add vision capability if present
257        if let Some(ref vision) = agent.capabilities.vision {
258            capabilities.insert(
259                "vision".to_string(),
260                serde_json::to_value(vision).unwrap_or(serde_json::Value::Null),
261            );
262        }
263
264        // Add output capability (feagi-sensorimotor format)
265        if let Some(ref motor) = agent.capabilities.motor {
266            // Convert motor capability to "output" format: array of cortical IDs
267            let output_areas: Vec<String> = motor.source_cortical_areas.clone();
268            capabilities.insert(
269                "output".to_string(),
270                serde_json::to_value(output_areas).unwrap_or(serde_json::Value::Null),
271            );
272        }
273
274        // Add visualization capability if present
275        if let Some(ref viz) = agent.capabilities.visualization {
276            capabilities.insert(
277                "visualization".to_string(),
278                serde_json::to_value(viz).unwrap_or(serde_json::Value::Null),
279            );
280        }
281
282        // Add input capability (feagi-sensorimotor format)
283        // Note: cortical_mappings removed - device registrations are handled separately
284        if let Some(ref _sensory) = agent.capabilities.sensory {
285            // Return empty array since cortical_mappings no longer exist
286            // Device registrations should be accessed via device_registrations endpoint
287            capabilities.insert(
288                "input".to_string(),
289                serde_json::to_value(Vec::<String>::new()).unwrap_or(serde_json::Value::Null),
290            );
291        }
292
293        // Add custom capabilities
294        for (k, v) in agent.capabilities.custom.iter() {
295            capabilities.insert(k.clone(), v.clone());
296        }
297
298        Ok(AgentProperties {
299            agent_type: agent.agent_type.to_string(),
300            agent_ip,
301            agent_data_port,
302            agent_router_address,
303            agent_version,
304            controller_version,
305            capabilities,
306            chosen_transport: agent.chosen_transport.clone(),
307        })
308    }
309
310    async fn get_shared_memory_info(
311        &self,
312    ) -> AgentResult<HashMap<String, HashMap<String, serde_json::Value>>> {
313        // TODO: Implement shared memory tracking in agent registry
314        Ok(HashMap::new())
315    }
316
317    async fn deregister_agent(&self, agent_id: &str) -> AgentResult<()> {
318        self.agent_registry
319            .write()
320            .deregister(agent_id)
321            .map_err(AgentError::NotFound)?;
322
323        info!(
324            "✅ [AGENT-SERVICE] Agent '{}' deregistered successfully",
325            agent_id
326        );
327        Ok(())
328    }
329
330    async fn manual_stimulation(
331        &self,
332        stimulation_payload: HashMap<String, Vec<Vec<i32>>>,
333        mode: ManualStimulationMode,
334    ) -> AgentResult<HashMap<String, serde_json::Value>> {
335        // Use RuntimeService for sensory injection (service layer, not direct NPU access)
336        let runtime_service = self
337            .runtime_service
338            .read()
339            .as_ref()
340            .ok_or_else(|| {
341                AgentError::Internal(
342                    "Runtime service not available - cannot inject stimuli".to_string(),
343                )
344            })?
345            .clone();
346
347        let mut result = HashMap::new();
348        let mut total_stimulated = 0usize;
349        let mut requested_coordinates = 0usize;
350        let mut successful_areas = 0;
351        let mut failed_areas = Vec::new();
352        let mut coordinates_not_found = 0;
353
354        // Default potential for manual stimulation (high enough to trigger firing)
355        const DEFAULT_POTENTIAL: f32 = 100.0;
356
357        // First pass: validate all cortical areas and build injection data
358        let mut injection_requests: Vec<(String, Vec<(u32, u32, u32, f32)>)> = Vec::new();
359
360        {
361            let manager = self.connectome_manager.read();
362
363            for (cortical_id, coordinates) in stimulation_payload.iter() {
364                requested_coordinates += coordinates.len();
365                let cortical_id_typed = match CorticalID::try_from_base_64(cortical_id) {
366                    Ok(id) => id,
367                    Err(e) => {
368                        failed_areas.push(cortical_id.clone());
369                        warn!("Invalid cortical ID '{}': {}", cortical_id, e);
370                        continue;
371                    }
372                };
373
374                match manager.get_cortical_area(&cortical_id_typed) {
375                    Some(_area) => {
376                        // Build xyzp_data for this cortical area (coordinates with potential)
377                        let mut xyzp_data = Vec::new();
378
379                        for coord in coordinates {
380                            if coord.len() != 3 {
381                                warn!(
382                                    "Invalid coordinate format: {:?} (expected [x, y, z])",
383                                    coord
384                                );
385                                coordinates_not_found += 1;
386                                continue;
387                            }
388
389                            let x = coord[0] as u32;
390                            let y = coord[1] as u32;
391                            let z = coord[2] as u32;
392
393                            // Add coordinate with default potential
394                            xyzp_data.push((x, y, z, DEFAULT_POTENTIAL));
395                        }
396
397                        if !xyzp_data.is_empty() {
398                            injection_requests.push((cortical_id.clone(), xyzp_data));
399                            successful_areas += 1;
400                        }
401                    }
402                    None => {
403                        failed_areas.push(cortical_id.clone());
404                    }
405                }
406            }
407        } // Drop manager lock here before await
408
409        // Second pass: perform injections (no locks held)
410        for (cortical_id, xyzp_data) in injection_requests {
411            match runtime_service
412                .inject_sensory_by_coordinates(
413                    &cortical_id,
414                    &xyzp_data,
415                    match mode {
416                        ManualStimulationMode::Candidate => {
417                            crate::traits::runtime_service::ManualStimulationMode::Candidate
418                        }
419                        ManualStimulationMode::ForceFire => {
420                            crate::traits::runtime_service::ManualStimulationMode::ForceFire
421                        }
422                    },
423                )
424                .await
425            {
426                Ok(injected_count) => {
427                    total_stimulated += injected_count;
428                    if injected_count < xyzp_data.len() {
429                        coordinates_not_found += xyzp_data.len() - injected_count;
430                    }
431                }
432                Err(e) => {
433                    error!(
434                        "❌ [MANUAL-STIMULATION] Failed to inject for area {}: {}",
435                        cortical_id, e
436                    );
437                    coordinates_not_found += xyzp_data.len();
438                }
439            }
440        }
441
442        result.insert(
443            "success".to_string(),
444            serde_json::json!(failed_areas.is_empty() && coordinates_not_found == 0),
445        );
446        result.insert(
447            "total_coordinates".to_string(),
448            serde_json::json!(total_stimulated),
449        );
450        result.insert(
451            "requested_coordinates".to_string(),
452            serde_json::json!(requested_coordinates),
453        );
454        result.insert(
455            "matched_coordinates".to_string(),
456            serde_json::json!(total_stimulated),
457        );
458        result.insert(
459            "unique_neuron_ids".to_string(),
460            serde_json::json!(total_stimulated),
461        );
462        result.insert(
463            "mode".to_string(),
464            serde_json::json!(match mode {
465                ManualStimulationMode::Candidate => "candidate",
466                ManualStimulationMode::ForceFire => "force_fire",
467            }),
468        );
469        result.insert(
470            "successful_areas".to_string(),
471            serde_json::json!(successful_areas),
472        );
473        result.insert("failed_areas".to_string(), serde_json::json!(failed_areas));
474
475        if coordinates_not_found > 0 {
476            result.insert(
477                "coordinates_not_found".to_string(),
478                serde_json::json!(coordinates_not_found),
479            );
480        }
481
482        if !failed_areas.is_empty() {
483            result.insert(
484                "error".to_string(),
485                serde_json::json!(format!("Some cortical areas not found: {:?}", failed_areas)),
486            );
487        }
488
489        Ok(result)
490    }
491
492    fn try_set_runtime_service(&self, runtime_service: Arc<dyn RuntimeServiceTrait + Send + Sync>) {
493        self.set_runtime_service(runtime_service);
494    }
495}