Skip to main content

feagi_services/impls/
agent_service_impl.rs

1// Copyright 2025 Neuraville Inc.
2// Licensed under the Apache License, Version 2.0
3
4//! Agent service implementation
5//!
6//! This implementation delegates all agent management to the PNS AgentRegistry,
7//! ensuring centralized coordination consistent with the Python implementation.
8
9use async_trait::async_trait;
10use parking_lot::RwLock;
11use std::collections::HashMap;
12use std::sync::Arc;
13use tracing::{debug, error, info, warn};
14
15use crate::traits::agent_service::*;
16use crate::traits::registration_handler::RegistrationHandlerTrait;
17use crate::traits::RuntimeService as RuntimeServiceTrait;
18use crate::types::agent_registry::AgentRegistry;
19use crate::types::registration::RegistrationRequest;
20use feagi_brain_development::ConnectomeManager;
21use feagi_structures::genomic::cortical_area::CorticalID;
22
23/// Implementation of the Agent service
24pub struct AgentServiceImpl {
25    connectome_manager: Arc<RwLock<ConnectomeManager>>,
26    agent_registry: Arc<RwLock<AgentRegistry>>,
27    registration_handler: Option<Arc<dyn RegistrationHandlerTrait>>,
28    runtime_service: Arc<RwLock<Option<Arc<dyn RuntimeServiceTrait + Send + Sync>>>>,
29}
30
31impl AgentServiceImpl {
32    pub fn new(
33        connectome_manager: Arc<RwLock<ConnectomeManager>>,
34        agent_registry: Arc<RwLock<AgentRegistry>>,
35    ) -> Self {
36        Self {
37            connectome_manager,
38            agent_registry,
39            registration_handler: None,
40            runtime_service: Arc::new(RwLock::new(None)),
41        }
42    }
43
44    /// Create AgentServiceImpl with runtime service
45    pub fn new_with_runtime(
46        connectome_manager: Arc<RwLock<ConnectomeManager>>,
47        agent_registry: Arc<RwLock<AgentRegistry>>,
48        runtime_service: Arc<dyn RuntimeServiceTrait + Send + Sync>,
49    ) -> Self {
50        Self {
51            connectome_manager,
52            agent_registry,
53            registration_handler: None,
54            runtime_service: Arc::new(RwLock::new(Some(runtime_service))),
55        }
56    }
57
58    /// Set the PNS registration handler for full transport negotiation
59    /// Set registration handler (accepts trait object to break circular dependency)
60    pub fn set_registration_handler(&mut self, handler: Arc<dyn RegistrationHandlerTrait>) {
61        self.registration_handler = Some(handler);
62        info!("🦀 [AGENT-SERVICE] Registration handler connected");
63    }
64
65    /// Set the runtime service for sensory injection (thread-safe, can be called after Arc wrapping)
66    pub fn set_runtime_service(&self, runtime_service: Arc<dyn RuntimeServiceTrait + Send + Sync>) {
67        let mut guard = self.runtime_service.write();
68        if let Some(existing) = guard.as_ref() {
69            if Arc::ptr_eq(existing, &runtime_service) {
70                debug!(
71                    "🦀 [AGENT-SERVICE] Runtime service already connected; ignoring duplicate bind"
72                );
73                return;
74            }
75            warn!("🦀 [AGENT-SERVICE] Runtime service replaced with a new instance");
76        } else {
77            info!("🦀 [AGENT-SERVICE] Runtime service connected");
78        }
79        *guard = Some(runtime_service);
80    }
81}
82
83#[async_trait]
84impl AgentService for AgentServiceImpl {
85    async fn register_agent(
86        &self,
87        registration: AgentRegistration,
88    ) -> AgentResult<AgentRegistrationResponse> {
89        info!(
90            "🦀 [AGENT-SERVICE] Registering agent: {} (type: {})",
91            registration.agent_id, registration.agent_type
92        );
93
94        // If we have a registration handler, use it (gets full transport info)
95        if let Some(handler) = &self.registration_handler {
96            info!(
97                "📝 [AGENT-SERVICE] Using PNS registration handler for full transport negotiation"
98            );
99
100            // Build PNS registration request
101            let pns_request = RegistrationRequest {
102                agent_id: registration.agent_id.clone(),
103                agent_type: registration.agent_type.clone(),
104                capabilities: serde_json::to_value(&registration.capabilities)
105                    .unwrap_or(serde_json::Value::Object(serde_json::Map::new())),
106                chosen_transport: registration.chosen_transport.clone(), // Pass through the agent's transport choice
107            };
108
109            // Call registration handler - use spawn_blocking to avoid blocking the async runtime
110            // The process_registration method is sync and may block, so we offload it to a blocking thread pool
111            let handler_clone = handler.clone();
112            let pns_response = tokio::task::spawn_blocking(move || {
113                handler_clone.process_registration(pns_request)
114            })
115            .await
116            .map_err(|e| {
117                AgentError::RegistrationFailed(format!("Registration task panicked: {:?}", e))
118            })?
119            .map_err(AgentError::RegistrationFailed)?;
120
121            // Convert PNS transport configs to service transport configs
122            let transports = pns_response.transports.map(|ts| {
123                ts.into_iter()
124                    .map(|t| TransportConfig {
125                        transport_type: t.transport_type,
126                        enabled: t.enabled,
127                        ports: t.ports,
128                        host: t.host,
129                    })
130                    .collect()
131            });
132
133            // Serialize cortical areas with proper error handling (don't panic!)
134            let cortical_areas_json =
135                serde_json::to_value(&pns_response.cortical_areas).map_err(|e| {
136                    error!(
137                        "❌ [AGENT-SERVICE] Failed to serialize cortical areas: {}",
138                        e
139                    );
140                    AgentError::Internal(format!("Failed to serialize cortical areas: {}", e))
141                })?;
142
143            return Ok(AgentRegistrationResponse {
144                status: pns_response.status,
145                message: pns_response
146                    .message
147                    .unwrap_or_else(|| "Success".to_string()),
148                success: true,
149                transport: None,
150                rates: None,
151                transports,
152                recommended_transport: pns_response.recommended_transport,
153                shm_paths: pns_response.shm_paths,
154                cortical_areas: cortical_areas_json,
155            });
156        }
157
158        return Err(AgentError::RegistrationFailed(
159            "Registration handler not available - required for FEAGI 2.0".to_string(),
160        ));
161    }
162
163    async fn heartbeat(&self, request: HeartbeatRequest) -> AgentResult<()> {
164        // Check if agent exists before attempting heartbeat
165        let agent_exists = {
166            let registry = self.agent_registry.read();
167            registry.get(&request.agent_id).is_some()
168        };
169
170        if !agent_exists {
171            // Log diagnostic information when agent not found
172            let all_agents: Vec<String> = {
173                let registry = self.agent_registry.read();
174                registry
175                    .get_all()
176                    .iter()
177                    .map(|a| a.agent_id.clone())
178                    .collect()
179            };
180            warn!(
181                "⚠️ [HEARTBEAT] Agent '{}' not found in registry. Registered agents ({}): {:?}",
182                request.agent_id,
183                all_agents.len(),
184                all_agents
185            );
186            return Err(AgentError::NotFound(format!(
187                "Agent {} not found in registry (total registered: {})",
188                request.agent_id,
189                all_agents.len()
190            )));
191        }
192
193        // Agent exists - update heartbeat
194        self.agent_registry
195            .write()
196            .heartbeat(&request.agent_id)
197            .map_err(|e| {
198                error!(
199                    "❌ [HEARTBEAT] Failed to update heartbeat for '{}': {}",
200                    request.agent_id, e
201                );
202                AgentError::NotFound(e)
203            })?;
204        Ok(())
205    }
206
207    async fn list_agents(&self) -> AgentResult<Vec<String>> {
208        tracing::trace!(target: "feagi-services", "list_agents() called - acquiring registry read lock...");
209        let registry = self.agent_registry.read();
210        let agents = registry.get_all();
211        let agent_ids: Vec<String> = agents.iter().map(|a| a.agent_id.clone()).collect();
212
213        tracing::trace!(
214            target: "feagi-services",
215            "list_agents() found {} agents: {:?}",
216            agent_ids.len(),
217            agent_ids
218        );
219        tracing::trace!(
220            target: "feagi-services",
221            "AgentRegistry pointer: {:p}",
222            &*self.agent_registry as *const _
223        );
224
225        Ok(agent_ids)
226    }
227
228    async fn get_agent_properties(&self, agent_id: &str) -> AgentResult<AgentProperties> {
229        let registry = self.agent_registry.read();
230        let agent = registry
231            .get(agent_id)
232            .ok_or_else(|| AgentError::NotFound(format!("Agent {} not found", agent_id)))?;
233
234        // Extract properties from agent info
235        let agent_ip = agent
236            .metadata
237            .get("agent_ip")
238            .and_then(|v| v.as_str())
239            .unwrap_or("127.0.0.1")
240            .to_string();
241
242        let agent_data_port = agent
243            .metadata
244            .get("agent_data_port")
245            .and_then(|v| v.as_u64())
246            .unwrap_or(0) as u16;
247
248        let agent_version = agent
249            .metadata
250            .get("agent_version")
251            .and_then(|v| v.as_str())
252            .unwrap_or("unknown")
253            .to_string();
254
255        let controller_version = agent
256            .metadata
257            .get("controller_version")
258            .and_then(|v| v.as_str())
259            .unwrap_or("unknown")
260            .to_string();
261
262        let agent_router_address = format!("tcp://{}:{}", agent_ip, agent_data_port);
263
264        // Build full capabilities map using feagi-sensorimotor format: {"input": [...], "output": [...]}
265        let mut capabilities = HashMap::new();
266
267        // Add vision capability if present
268        if let Some(ref vision) = agent.capabilities.vision {
269            capabilities.insert(
270                "vision".to_string(),
271                serde_json::to_value(vision).unwrap_or(serde_json::Value::Null),
272            );
273        }
274
275        // Add output capability (feagi-sensorimotor format)
276        if let Some(ref motor) = agent.capabilities.motor {
277            // Convert motor capability to "output" format: array of cortical IDs
278            let output_areas: Vec<String> = motor.source_cortical_areas.clone();
279            capabilities.insert(
280                "output".to_string(),
281                serde_json::to_value(output_areas).unwrap_or(serde_json::Value::Null),
282            );
283        }
284
285        // Add visualization capability if present
286        if let Some(ref viz) = agent.capabilities.visualization {
287            capabilities.insert(
288                "visualization".to_string(),
289                serde_json::to_value(viz).unwrap_or(serde_json::Value::Null),
290            );
291        }
292
293        // Add input capability (feagi-sensorimotor format)
294        // Note: cortical_mappings removed - device registrations are handled separately
295        if let Some(ref _sensory) = agent.capabilities.sensory {
296            // Return empty array since cortical_mappings no longer exist
297            // Device registrations should be accessed via device_registrations endpoint
298            capabilities.insert(
299                "input".to_string(),
300                serde_json::to_value(Vec::<String>::new()).unwrap_or(serde_json::Value::Null),
301            );
302        }
303
304        // Add custom capabilities
305        for (k, v) in agent.capabilities.custom.iter() {
306            capabilities.insert(k.clone(), v.clone());
307        }
308
309        Ok(AgentProperties {
310            agent_type: agent.agent_type.to_string(),
311            agent_ip,
312            agent_data_port,
313            agent_router_address,
314            agent_version,
315            controller_version,
316            capabilities,
317            chosen_transport: agent.chosen_transport.clone(),
318        })
319    }
320
321    async fn get_shared_memory_info(
322        &self,
323    ) -> AgentResult<HashMap<String, HashMap<String, serde_json::Value>>> {
324        // TODO: Implement shared memory tracking in agent registry
325        Ok(HashMap::new())
326    }
327
328    async fn deregister_agent(&self, agent_id: &str) -> AgentResult<()> {
329        self.agent_registry
330            .write()
331            .deregister(agent_id)
332            .map_err(AgentError::NotFound)?;
333
334        info!(
335            "✅ [AGENT-SERVICE] Agent '{}' deregistered successfully",
336            agent_id
337        );
338        Ok(())
339    }
340
341    async fn manual_stimulation(
342        &self,
343        stimulation_payload: HashMap<String, Vec<Vec<i32>>>,
344        mode: ManualStimulationMode,
345    ) -> AgentResult<HashMap<String, serde_json::Value>> {
346        // Use RuntimeService for sensory injection (service layer, not direct NPU access)
347        let runtime_service = self
348            .runtime_service
349            .read()
350            .as_ref()
351            .ok_or_else(|| {
352                AgentError::Internal(
353                    "Runtime service not available - cannot inject stimuli".to_string(),
354                )
355            })?
356            .clone();
357
358        let mut result = HashMap::new();
359        let mut total_stimulated = 0usize;
360        let mut requested_coordinates = 0usize;
361        let mut successful_areas = 0;
362        let mut failed_areas = Vec::new();
363        let mut coordinates_not_found = 0;
364
365        // Default potential for manual stimulation (high enough to trigger firing)
366        const DEFAULT_POTENTIAL: f32 = 100.0;
367
368        // First pass: validate all cortical areas and build injection data
369        let mut injection_requests: Vec<(String, Vec<(u32, u32, u32, f32)>)> = Vec::new();
370
371        {
372            let manager = self.connectome_manager.read();
373
374            for (cortical_id, coordinates) in stimulation_payload.iter() {
375                requested_coordinates += coordinates.len();
376                let cortical_id_typed = match CorticalID::try_from_base_64(cortical_id) {
377                    Ok(id) => id,
378                    Err(e) => {
379                        failed_areas.push(cortical_id.clone());
380                        warn!("Invalid cortical ID '{}': {}", cortical_id, e);
381                        continue;
382                    }
383                };
384
385                match manager.get_cortical_area(&cortical_id_typed) {
386                    Some(_area) => {
387                        // Build xyzp_data for this cortical area (coordinates with potential)
388                        let mut xyzp_data = Vec::new();
389
390                        for coord in coordinates {
391                            if coord.len() != 3 {
392                                warn!(
393                                    "Invalid coordinate format: {:?} (expected [x, y, z])",
394                                    coord
395                                );
396                                coordinates_not_found += 1;
397                                continue;
398                            }
399
400                            let x = coord[0] as u32;
401                            let y = coord[1] as u32;
402                            let z = coord[2] as u32;
403
404                            // Add coordinate with default potential
405                            xyzp_data.push((x, y, z, DEFAULT_POTENTIAL));
406                        }
407
408                        if !xyzp_data.is_empty() {
409                            injection_requests.push((cortical_id.clone(), xyzp_data));
410                            successful_areas += 1;
411                        }
412                    }
413                    None => {
414                        failed_areas.push(cortical_id.clone());
415                    }
416                }
417            }
418        } // Drop manager lock here before await
419
420        // Second pass: perform injections (no locks held)
421        for (cortical_id, xyzp_data) in injection_requests {
422            match runtime_service
423                .inject_sensory_by_coordinates(
424                    &cortical_id,
425                    &xyzp_data,
426                    match mode {
427                        ManualStimulationMode::Candidate => {
428                            crate::traits::runtime_service::ManualStimulationMode::Candidate
429                        }
430                        ManualStimulationMode::ForceFire => {
431                            crate::traits::runtime_service::ManualStimulationMode::ForceFire
432                        }
433                    },
434                )
435                .await
436            {
437                Ok(injected_count) => {
438                    total_stimulated += injected_count;
439                    if injected_count < xyzp_data.len() {
440                        coordinates_not_found += xyzp_data.len() - injected_count;
441                    }
442                }
443                Err(e) => {
444                    error!(
445                        "❌ [MANUAL-STIMULATION] Failed to inject for area {}: {}",
446                        cortical_id, e
447                    );
448                    coordinates_not_found += xyzp_data.len();
449                }
450            }
451        }
452
453        result.insert(
454            "success".to_string(),
455            serde_json::json!(failed_areas.is_empty() && coordinates_not_found == 0),
456        );
457        result.insert(
458            "total_coordinates".to_string(),
459            serde_json::json!(total_stimulated),
460        );
461        result.insert(
462            "requested_coordinates".to_string(),
463            serde_json::json!(requested_coordinates),
464        );
465        result.insert(
466            "matched_coordinates".to_string(),
467            serde_json::json!(total_stimulated),
468        );
469        result.insert(
470            "unique_neuron_ids".to_string(),
471            serde_json::json!(total_stimulated),
472        );
473        result.insert(
474            "mode".to_string(),
475            serde_json::json!(match mode {
476                ManualStimulationMode::Candidate => "candidate",
477                ManualStimulationMode::ForceFire => "force_fire",
478            }),
479        );
480        result.insert(
481            "successful_areas".to_string(),
482            serde_json::json!(successful_areas),
483        );
484        result.insert("failed_areas".to_string(), serde_json::json!(failed_areas));
485
486        if coordinates_not_found > 0 {
487            result.insert(
488                "coordinates_not_found".to_string(),
489                serde_json::json!(coordinates_not_found),
490            );
491        }
492
493        if !failed_areas.is_empty() {
494            result.insert(
495                "error".to_string(),
496                serde_json::json!(format!("Some cortical areas not found: {:?}", failed_areas)),
497            );
498        }
499
500        Ok(result)
501    }
502
503    fn try_set_runtime_service(&self, runtime_service: Arc<dyn RuntimeServiceTrait + Send + Sync>) {
504        self.set_runtime_service(runtime_service);
505    }
506}