Skip to main content

feagi_services/impls/
agent_service_impl.rs

1// Copyright 2025 Neuraville Inc.
2// Licensed under the Apache License, Version 2.0
3
4//! Agent service implementation
5//!
6//! This implementation delegates all agent management to the PNS AgentRegistry,
7//! ensuring centralized coordination consistent with the Python implementation.
8
9use async_trait::async_trait;
10use parking_lot::RwLock;
11use std::collections::HashMap;
12use std::sync::Arc;
13use tracing::{debug, error, info, warn};
14
15use crate::traits::agent_service::*;
16use crate::traits::registration_handler::RegistrationHandlerTrait;
17use crate::traits::RuntimeService as RuntimeServiceTrait;
18use crate::types::agent_registry::AgentRegistry;
19use crate::types::registration::RegistrationRequest;
20use feagi_brain_development::ConnectomeManager;
21use feagi_structures::genomic::cortical_area::CorticalID;
22
23/// Implementation of the Agent service
24pub struct AgentServiceImpl {
25    connectome_manager: Arc<RwLock<ConnectomeManager>>,
26    agent_registry: Arc<RwLock<AgentRegistry>>,
27    registration_handler: Option<Arc<dyn RegistrationHandlerTrait>>,
28    runtime_service: Arc<RwLock<Option<Arc<dyn RuntimeServiceTrait + Send + Sync>>>>,
29}
30
31impl AgentServiceImpl {
32    pub fn new(
33        connectome_manager: Arc<RwLock<ConnectomeManager>>,
34        agent_registry: Arc<RwLock<AgentRegistry>>,
35    ) -> Self {
36        Self {
37            connectome_manager,
38            agent_registry,
39            registration_handler: None,
40            runtime_service: Arc::new(RwLock::new(None)),
41        }
42    }
43
44    /// Create AgentServiceImpl with runtime service
45    pub fn new_with_runtime(
46        connectome_manager: Arc<RwLock<ConnectomeManager>>,
47        agent_registry: Arc<RwLock<AgentRegistry>>,
48        runtime_service: Arc<dyn RuntimeServiceTrait + Send + Sync>,
49    ) -> Self {
50        Self {
51            connectome_manager,
52            agent_registry,
53            registration_handler: None,
54            runtime_service: Arc::new(RwLock::new(Some(runtime_service))),
55        }
56    }
57
58    /// Set the PNS registration handler for full transport negotiation
59    /// Set registration handler (accepts trait object to break circular dependency)
60    pub fn set_registration_handler(&mut self, handler: Arc<dyn RegistrationHandlerTrait>) {
61        self.registration_handler = Some(handler);
62        info!("🦀 [AGENT-SERVICE] Registration handler connected");
63    }
64
65    /// Set the runtime service for sensory injection (thread-safe, can be called after Arc wrapping)
66    pub fn set_runtime_service(&self, runtime_service: Arc<dyn RuntimeServiceTrait + Send + Sync>) {
67        let mut guard = self.runtime_service.write();
68        if let Some(existing) = guard.as_ref() {
69            if Arc::ptr_eq(existing, &runtime_service) {
70                debug!(
71                    "🦀 [AGENT-SERVICE] Runtime service already connected; ignoring duplicate bind"
72                );
73                return;
74            }
75            warn!("🦀 [AGENT-SERVICE] Runtime service replaced with a new instance");
76        } else {
77            info!("🦀 [AGENT-SERVICE] Runtime service connected");
78        }
79        *guard = Some(runtime_service);
80    }
81}
82
83#[async_trait]
84impl AgentService for AgentServiceImpl {
85    async fn register_agent(
86        &self,
87        registration: AgentRegistration,
88    ) -> AgentResult<AgentRegistrationResponse> {
89        info!(
90            "🦀 [AGENT-SERVICE] Registering agent: {} (type: {})",
91            registration.agent_id, registration.agent_type
92        );
93
94        // If we have a registration handler, use it (gets full transport info)
95        if let Some(handler) = &self.registration_handler {
96            info!(
97                "📝 [AGENT-SERVICE] Using PNS registration handler for full transport negotiation"
98            );
99
100            // Build PNS registration request
101            let pns_request = RegistrationRequest {
102                agent_id: registration.agent_id.clone(),
103                agent_type: registration.agent_type.clone(),
104                capabilities: serde_json::to_value(&registration.capabilities)
105                    .unwrap_or(serde_json::Value::Object(serde_json::Map::new())),
106                chosen_transport: registration.chosen_transport.clone(), // Pass through the agent's transport choice
107            };
108
109            // Call registration handler - use spawn_blocking to avoid blocking the async runtime
110            // The process_registration method is sync and may block, so we offload it to a blocking thread pool
111            let handler_clone = handler.clone();
112            let pns_response = tokio::task::spawn_blocking(move || {
113                handler_clone.process_registration(pns_request)
114            })
115            .await
116            .map_err(|e| {
117                AgentError::RegistrationFailed(format!("Registration task panicked: {:?}", e))
118            })?
119            .map_err(AgentError::RegistrationFailed)?;
120
121            // Convert PNS transport configs to service transport configs
122            let transports = pns_response.transports.map(|ts| {
123                ts.into_iter()
124                    .map(|t| TransportConfig {
125                        transport_type: t.transport_type,
126                        enabled: t.enabled,
127                        ports: t.ports,
128                        host: t.host,
129                    })
130                    .collect()
131            });
132
133            // Serialize cortical areas with proper error handling (don't panic!)
134            let cortical_areas_json =
135                serde_json::to_value(&pns_response.cortical_areas).map_err(|e| {
136                    error!(
137                        "❌ [AGENT-SERVICE] Failed to serialize cortical areas: {}",
138                        e
139                    );
140                    AgentError::Internal(format!("Failed to serialize cortical areas: {}", e))
141                })?;
142
143            return Ok(AgentRegistrationResponse {
144                status: pns_response.status,
145                message: pns_response
146                    .message
147                    .unwrap_or_else(|| "Success".to_string()),
148                success: true,
149                transport: None,
150                rates: None,
151                transports,
152                recommended_transport: pns_response.recommended_transport,
153                shm_paths: pns_response.shm_paths,
154                cortical_areas: cortical_areas_json,
155            });
156        }
157
158        return Err(AgentError::RegistrationFailed(
159            "Registration handler not available - required for FEAGI 2.0".to_string(),
160        ));
161    }
162
163    async fn heartbeat(&self, request: HeartbeatRequest) -> AgentResult<()> {
164        // Check if agent exists before attempting heartbeat
165        let agent_exists = {
166            let registry = self.agent_registry.read();
167            registry.get(&request.agent_id).is_some()
168        };
169
170        if !agent_exists {
171            // Log diagnostic information when agent not found
172            let all_agents: Vec<String> = {
173                let registry = self.agent_registry.read();
174                registry
175                    .get_all()
176                    .iter()
177                    .map(|a| a.agent_id.clone())
178                    .collect()
179            };
180            warn!(
181                "⚠️ [HEARTBEAT] Agent '{}' not found in registry. Registered agents ({}): {:?}",
182                request.agent_id,
183                all_agents.len(),
184                all_agents
185            );
186            return Err(AgentError::NotFound(format!(
187                "Agent {} not found in registry (total registered: {})",
188                request.agent_id,
189                all_agents.len()
190            )));
191        }
192
193        // Agent exists - update heartbeat
194        self.agent_registry
195            .write()
196            .heartbeat(&request.agent_id)
197            .map_err(|e| {
198                error!(
199                    "❌ [HEARTBEAT] Failed to update heartbeat for '{}': {}",
200                    request.agent_id, e
201                );
202                AgentError::NotFound(e)
203            })?;
204        Ok(())
205    }
206
207    async fn list_agents(&self) -> AgentResult<Vec<String>> {
208        tracing::trace!(target: "feagi-services", "list_agents() called - acquiring registry read lock...");
209        let registry = self.agent_registry.read();
210        let agents = registry.get_all();
211        let agent_ids: Vec<String> = agents.iter().map(|a| a.agent_id.clone()).collect();
212
213        tracing::trace!(
214            target: "feagi-services",
215            "list_agents() found {} agents: {:?}",
216            agent_ids.len(),
217            agent_ids
218        );
219        tracing::trace!(
220            target: "feagi-services",
221            "AgentRegistry pointer: {:p}",
222            &*self.agent_registry as *const _
223        );
224
225        Ok(agent_ids)
226    }
227
228    async fn get_agent_properties(&self, agent_id: &str) -> AgentResult<AgentProperties> {
229        let registry = self.agent_registry.read();
230        let agent = registry
231            .get(agent_id)
232            .ok_or_else(|| AgentError::NotFound(format!("Agent {} not found", agent_id)))?;
233
234        // Extract properties from agent info
235        let agent_ip = agent
236            .metadata
237            .get("agent_ip")
238            .and_then(|v| v.as_str())
239            .unwrap_or("127.0.0.1")
240            .to_string();
241
242        let agent_data_port = agent
243            .metadata
244            .get("agent_data_port")
245            .and_then(|v| v.as_u64())
246            .unwrap_or(0) as u16;
247
248        let agent_version = agent
249            .metadata
250            .get("agent_version")
251            .and_then(|v| v.as_str())
252            .unwrap_or("unknown")
253            .to_string();
254
255        let controller_version = agent
256            .metadata
257            .get("controller_version")
258            .and_then(|v| v.as_str())
259            .unwrap_or("unknown")
260            .to_string();
261
262        let agent_router_address = format!("tcp://{}:{}", agent_ip, agent_data_port);
263
264        // Build full capabilities map using feagi-sensorimotor format: {"input": [...], "output": [...]}
265        let mut capabilities = HashMap::new();
266
267        // Add vision capability if present
268        if let Some(ref vision) = agent.capabilities.vision {
269            capabilities.insert(
270                "vision".to_string(),
271                serde_json::to_value(vision).unwrap_or(serde_json::Value::Null),
272            );
273        }
274
275        // Add output capability (feagi-sensorimotor format)
276        if let Some(ref motor) = agent.capabilities.motor {
277            // Convert motor capability to "output" format: array of cortical IDs
278            let output_areas: Vec<String> = motor.source_cortical_areas.clone();
279            capabilities.insert(
280                "output".to_string(),
281                serde_json::to_value(output_areas).unwrap_or(serde_json::Value::Null),
282            );
283        }
284
285        // Add visualization capability if present
286        if let Some(ref viz) = agent.capabilities.visualization {
287            capabilities.insert(
288                "visualization".to_string(),
289                serde_json::to_value(viz).unwrap_or(serde_json::Value::Null),
290            );
291        }
292
293        // Add input capability (feagi-sensorimotor format)
294        // Note: cortical_mappings removed - device registrations are handled separately
295        if let Some(ref _sensory) = agent.capabilities.sensory {
296            // Return empty array since cortical_mappings no longer exist
297            // Device registrations should be accessed via device_registrations endpoint
298            capabilities.insert(
299                "input".to_string(),
300                serde_json::to_value(Vec::<String>::new()).unwrap_or(serde_json::Value::Null),
301            );
302        }
303
304        // Add custom capabilities
305        for (k, v) in agent.capabilities.custom.iter() {
306            capabilities.insert(k.clone(), v.clone());
307        }
308
309        Ok(AgentProperties {
310            agent_type: agent.agent_type.to_string(),
311            agent_ip,
312            agent_data_port,
313            agent_router_address,
314            agent_version,
315            controller_version,
316            capabilities,
317            chosen_transport: agent.chosen_transport.clone(),
318        })
319    }
320
321    async fn get_shared_memory_info(
322        &self,
323    ) -> AgentResult<HashMap<String, HashMap<String, serde_json::Value>>> {
324        // TODO: Implement shared memory tracking in agent registry
325        Ok(HashMap::new())
326    }
327
328    async fn deregister_agent(&self, agent_id: &str) -> AgentResult<()> {
329        self.agent_registry
330            .write()
331            .deregister(agent_id)
332            .map_err(AgentError::NotFound)?;
333
334        info!(
335            "✅ [AGENT-SERVICE] Agent '{}' deregistered successfully",
336            agent_id
337        );
338        Ok(())
339    }
340
341    async fn manual_stimulation(
342        &self,
343        stimulation_payload: HashMap<String, Vec<Vec<i32>>>,
344        mode: ManualStimulationMode,
345    ) -> AgentResult<HashMap<String, serde_json::Value>> {
346        // Manual stimulation is authoritative: always use force-fire injection so every resolved
347        // voxel neuron is merged into the fire queue after dynamics, regardless of MP/refractory/
348        // excitability. The `mode` argument is kept for API compatibility only.
349        let _ = mode;
350
351        // Use RuntimeService for sensory injection (service layer, not direct NPU access)
352        let runtime_service = self
353            .runtime_service
354            .read()
355            .as_ref()
356            .ok_or_else(|| {
357                AgentError::Internal(
358                    "Runtime service not available - cannot inject stimuli".to_string(),
359                )
360            })?
361            .clone();
362
363        let mut result = HashMap::new();
364        let mut total_stimulated = 0usize;
365        let mut requested_coordinates = 0usize;
366        let mut successful_areas = 0;
367        let mut failed_areas = Vec::new();
368        let mut coordinates_not_found = 0;
369
370        // Default potential for manual stimulation (high enough to trigger firing)
371        const DEFAULT_POTENTIAL: f32 = 100.0;
372
373        // First pass: validate all cortical areas and build injection data
374        let mut injection_requests: Vec<(String, Vec<(u32, u32, u32, f32)>)> = Vec::new();
375
376        {
377            let manager = self.connectome_manager.read();
378
379            for (cortical_id, coordinates) in stimulation_payload.iter() {
380                requested_coordinates += coordinates.len();
381                let cortical_id_typed = match CorticalID::try_from_base_64(cortical_id) {
382                    Ok(id) => id,
383                    Err(e) => {
384                        failed_areas.push(cortical_id.clone());
385                        warn!("Invalid cortical ID '{}': {}", cortical_id, e);
386                        continue;
387                    }
388                };
389
390                match manager.get_cortical_area(&cortical_id_typed) {
391                    Some(_area) => {
392                        // Build xyzp_data for this cortical area (coordinates with potential)
393                        let mut xyzp_data = Vec::new();
394
395                        for coord in coordinates {
396                            if coord.len() != 3 {
397                                warn!(
398                                    "Invalid coordinate format: {:?} (expected [x, y, z])",
399                                    coord
400                                );
401                                coordinates_not_found += 1;
402                                continue;
403                            }
404
405                            let x = coord[0] as u32;
406                            let y = coord[1] as u32;
407                            let z = coord[2] as u32;
408
409                            // Add coordinate with default potential
410                            xyzp_data.push((x, y, z, DEFAULT_POTENTIAL));
411                        }
412
413                        if !xyzp_data.is_empty() {
414                            injection_requests.push((cortical_id.clone(), xyzp_data));
415                            successful_areas += 1;
416                        }
417                    }
418                    None => {
419                        failed_areas.push(cortical_id.clone());
420                    }
421                }
422            }
423        } // Drop manager lock here before await
424
425        // Second pass: perform injections (no locks held)
426        for (cortical_id, xyzp_data) in injection_requests {
427            match runtime_service
428                .inject_sensory_by_coordinates(
429                    &cortical_id,
430                    &xyzp_data,
431                    crate::traits::runtime_service::ManualStimulationMode::ForceFire,
432                )
433                .await
434            {
435                Ok(injected_count) => {
436                    total_stimulated += injected_count;
437                    if injected_count < xyzp_data.len() {
438                        coordinates_not_found += xyzp_data.len() - injected_count;
439                    }
440                }
441                Err(e) => {
442                    error!(
443                        "❌ [MANUAL-STIMULATION] Failed to inject for area {}: {}",
444                        cortical_id, e
445                    );
446                    coordinates_not_found += xyzp_data.len();
447                }
448            }
449        }
450
451        result.insert(
452            "success".to_string(),
453            serde_json::json!(failed_areas.is_empty() && coordinates_not_found == 0),
454        );
455        result.insert(
456            "total_coordinates".to_string(),
457            serde_json::json!(total_stimulated),
458        );
459        result.insert(
460            "requested_coordinates".to_string(),
461            serde_json::json!(requested_coordinates),
462        );
463        result.insert(
464            "matched_coordinates".to_string(),
465            serde_json::json!(total_stimulated),
466        );
467        result.insert(
468            "unique_neuron_ids".to_string(),
469            serde_json::json!(total_stimulated),
470        );
471        result.insert("mode".to_string(), serde_json::json!("force_fire"));
472        result.insert(
473            "successful_areas".to_string(),
474            serde_json::json!(successful_areas),
475        );
476        result.insert("failed_areas".to_string(), serde_json::json!(failed_areas));
477
478        if coordinates_not_found > 0 {
479            result.insert(
480                "coordinates_not_found".to_string(),
481                serde_json::json!(coordinates_not_found),
482            );
483        }
484
485        if !failed_areas.is_empty() {
486            result.insert(
487                "error".to_string(),
488                serde_json::json!(format!("Some cortical areas not found: {:?}", failed_areas)),
489            );
490        }
491
492        Ok(result)
493    }
494
495    fn try_set_runtime_service(&self, runtime_service: Arc<dyn RuntimeServiceTrait + Send + Sync>) {
496        self.set_runtime_service(runtime_service);
497    }
498}