1use async_trait::async_trait;
10use parking_lot::RwLock;
11use std::collections::HashMap;
12use std::sync::Arc;
13use tracing::{error, info, warn};
14
15use crate::traits::agent_service::*;
16use crate::traits::registration_handler::RegistrationHandlerTrait;
17use crate::traits::RuntimeService as RuntimeServiceTrait;
18use crate::types::agent_registry::AgentRegistry;
19use crate::types::registration::RegistrationRequest;
20use feagi_brain_development::ConnectomeManager;
21use feagi_structures::genomic::cortical_area::CorticalID;
22
23pub struct AgentServiceImpl {
25 connectome_manager: Arc<RwLock<ConnectomeManager>>,
26 agent_registry: Arc<RwLock<AgentRegistry>>,
27 registration_handler: Option<Arc<dyn RegistrationHandlerTrait>>,
28 runtime_service: Arc<RwLock<Option<Arc<dyn RuntimeServiceTrait + Send + Sync>>>>,
29}
30
31impl AgentServiceImpl {
32 pub fn new(
33 connectome_manager: Arc<RwLock<ConnectomeManager>>,
34 agent_registry: Arc<RwLock<AgentRegistry>>,
35 ) -> Self {
36 Self {
37 connectome_manager,
38 agent_registry,
39 registration_handler: None,
40 runtime_service: Arc::new(RwLock::new(None)),
41 }
42 }
43
44 pub fn new_with_runtime(
46 connectome_manager: Arc<RwLock<ConnectomeManager>>,
47 agent_registry: Arc<RwLock<AgentRegistry>>,
48 runtime_service: Arc<dyn RuntimeServiceTrait + Send + Sync>,
49 ) -> Self {
50 Self {
51 connectome_manager,
52 agent_registry,
53 registration_handler: None,
54 runtime_service: Arc::new(RwLock::new(Some(runtime_service))),
55 }
56 }
57
58 pub fn set_registration_handler(&mut self, handler: Arc<dyn RegistrationHandlerTrait>) {
61 self.registration_handler = Some(handler);
62 info!("🦀 [AGENT-SERVICE] Registration handler connected");
63 }
64
65 pub fn set_runtime_service(&self, runtime_service: Arc<dyn RuntimeServiceTrait + Send + Sync>) {
67 *self.runtime_service.write() = Some(runtime_service);
68 info!("🦀 [AGENT-SERVICE] Runtime service connected");
69 }
70}
71
72#[async_trait]
73impl AgentService for AgentServiceImpl {
74 async fn register_agent(
75 &self,
76 registration: AgentRegistration,
77 ) -> AgentResult<AgentRegistrationResponse> {
78 info!(
79 "🦀 [AGENT-SERVICE] Registering agent: {} (type: {})",
80 registration.agent_id, registration.agent_type
81 );
82
83 if let Some(handler) = &self.registration_handler {
85 info!(
86 "📝 [AGENT-SERVICE] Using PNS registration handler for full transport negotiation"
87 );
88
89 let pns_request = RegistrationRequest {
91 agent_id: registration.agent_id.clone(),
92 agent_type: registration.agent_type.clone(),
93 capabilities: serde_json::to_value(®istration.capabilities)
94 .unwrap_or(serde_json::Value::Object(serde_json::Map::new())),
95 chosen_transport: registration.chosen_transport.clone(), };
97
98 let handler_clone = handler.clone();
101 let pns_response = tokio::task::spawn_blocking(move || {
102 handler_clone.process_registration(pns_request)
103 })
104 .await
105 .map_err(|e| {
106 AgentError::RegistrationFailed(format!("Registration task panicked: {:?}", e))
107 })?
108 .map_err(AgentError::RegistrationFailed)?;
109
110 let transports = pns_response.transports.map(|ts| {
112 ts.into_iter()
113 .map(|t| TransportConfig {
114 transport_type: t.transport_type,
115 enabled: t.enabled,
116 ports: t.ports,
117 host: t.host,
118 })
119 .collect()
120 });
121
122 let cortical_areas_json =
124 serde_json::to_value(&pns_response.cortical_areas).map_err(|e| {
125 error!(
126 "❌ [AGENT-SERVICE] Failed to serialize cortical areas: {}",
127 e
128 );
129 AgentError::Internal(format!("Failed to serialize cortical areas: {}", e))
130 })?;
131
132 return Ok(AgentRegistrationResponse {
133 status: pns_response.status,
134 message: pns_response
135 .message
136 .unwrap_or_else(|| "Success".to_string()),
137 success: true,
138 transport: None,
139 rates: None,
140 transports,
141 recommended_transport: pns_response.recommended_transport,
142 shm_paths: pns_response.shm_paths,
143 cortical_areas: cortical_areas_json,
144 });
145 }
146
147 return Err(AgentError::RegistrationFailed(
148 "Registration handler not available - required for FEAGI 2.0".to_string(),
149 ));
150 }
151
152 async fn heartbeat(&self, request: HeartbeatRequest) -> AgentResult<()> {
153 let agent_exists = {
155 let registry = self.agent_registry.read();
156 registry.get(&request.agent_id).is_some()
157 };
158
159 if !agent_exists {
160 let all_agents: Vec<String> = {
162 let registry = self.agent_registry.read();
163 registry
164 .get_all()
165 .iter()
166 .map(|a| a.agent_id.clone())
167 .collect()
168 };
169 warn!(
170 "⚠️ [HEARTBEAT] Agent '{}' not found in registry. Registered agents ({}): {:?}",
171 request.agent_id,
172 all_agents.len(),
173 all_agents
174 );
175 return Err(AgentError::NotFound(format!(
176 "Agent {} not found in registry (total registered: {})",
177 request.agent_id,
178 all_agents.len()
179 )));
180 }
181
182 self.agent_registry
184 .write()
185 .heartbeat(&request.agent_id)
186 .map_err(|e| {
187 error!(
188 "❌ [HEARTBEAT] Failed to update heartbeat for '{}': {}",
189 request.agent_id, e
190 );
191 AgentError::NotFound(e)
192 })?;
193 Ok(())
194 }
195
196 async fn list_agents(&self) -> AgentResult<Vec<String>> {
197 tracing::trace!(target: "feagi-services", "list_agents() called - acquiring registry read lock...");
198 let registry = self.agent_registry.read();
199 let agents = registry.get_all();
200 let agent_ids: Vec<String> = agents.iter().map(|a| a.agent_id.clone()).collect();
201
202 tracing::trace!(
203 target: "feagi-services",
204 "list_agents() found {} agents: {:?}",
205 agent_ids.len(),
206 agent_ids
207 );
208 tracing::trace!(
209 target: "feagi-services",
210 "AgentRegistry pointer: {:p}",
211 &*self.agent_registry as *const _
212 );
213
214 Ok(agent_ids)
215 }
216
217 async fn get_agent_properties(&self, agent_id: &str) -> AgentResult<AgentProperties> {
218 let registry = self.agent_registry.read();
219 let agent = registry
220 .get(agent_id)
221 .ok_or_else(|| AgentError::NotFound(format!("Agent {} not found", agent_id)))?;
222
223 let agent_ip = agent
225 .metadata
226 .get("agent_ip")
227 .and_then(|v| v.as_str())
228 .unwrap_or("127.0.0.1")
229 .to_string();
230
231 let agent_data_port = agent
232 .metadata
233 .get("agent_data_port")
234 .and_then(|v| v.as_u64())
235 .unwrap_or(0) as u16;
236
237 let agent_version = agent
238 .metadata
239 .get("agent_version")
240 .and_then(|v| v.as_str())
241 .unwrap_or("unknown")
242 .to_string();
243
244 let controller_version = agent
245 .metadata
246 .get("controller_version")
247 .and_then(|v| v.as_str())
248 .unwrap_or("unknown")
249 .to_string();
250
251 let agent_router_address = format!("tcp://{}:{}", agent_ip, agent_data_port);
252
253 let mut capabilities = HashMap::new();
255
256 if let Some(ref vision) = agent.capabilities.vision {
258 capabilities.insert(
259 "vision".to_string(),
260 serde_json::to_value(vision).unwrap_or(serde_json::Value::Null),
261 );
262 }
263
264 if let Some(ref motor) = agent.capabilities.motor {
266 let output_areas: Vec<String> = motor.source_cortical_areas.clone();
268 capabilities.insert(
269 "output".to_string(),
270 serde_json::to_value(output_areas).unwrap_or(serde_json::Value::Null),
271 );
272 }
273
274 if let Some(ref viz) = agent.capabilities.visualization {
276 capabilities.insert(
277 "visualization".to_string(),
278 serde_json::to_value(viz).unwrap_or(serde_json::Value::Null),
279 );
280 }
281
282 if let Some(ref _sensory) = agent.capabilities.sensory {
285 capabilities.insert(
288 "input".to_string(),
289 serde_json::to_value(Vec::<String>::new()).unwrap_or(serde_json::Value::Null),
290 );
291 }
292
293 for (k, v) in agent.capabilities.custom.iter() {
295 capabilities.insert(k.clone(), v.clone());
296 }
297
298 Ok(AgentProperties {
299 agent_type: agent.agent_type.to_string(),
300 agent_ip,
301 agent_data_port,
302 agent_router_address,
303 agent_version,
304 controller_version,
305 capabilities,
306 chosen_transport: agent.chosen_transport.clone(),
307 })
308 }
309
310 async fn get_shared_memory_info(
311 &self,
312 ) -> AgentResult<HashMap<String, HashMap<String, serde_json::Value>>> {
313 Ok(HashMap::new())
315 }
316
317 async fn deregister_agent(&self, agent_id: &str) -> AgentResult<()> {
318 self.agent_registry
319 .write()
320 .deregister(agent_id)
321 .map_err(AgentError::NotFound)?;
322
323 info!(
324 "✅ [AGENT-SERVICE] Agent '{}' deregistered successfully",
325 agent_id
326 );
327 Ok(())
328 }
329
330 async fn manual_stimulation(
331 &self,
332 stimulation_payload: HashMap<String, Vec<Vec<i32>>>,
333 mode: ManualStimulationMode,
334 ) -> AgentResult<HashMap<String, serde_json::Value>> {
335 let runtime_service = self
337 .runtime_service
338 .read()
339 .as_ref()
340 .ok_or_else(|| {
341 AgentError::Internal(
342 "Runtime service not available - cannot inject stimuli".to_string(),
343 )
344 })?
345 .clone();
346
347 let mut result = HashMap::new();
348 let mut total_stimulated = 0usize;
349 let mut requested_coordinates = 0usize;
350 let mut successful_areas = 0;
351 let mut failed_areas = Vec::new();
352 let mut coordinates_not_found = 0;
353
354 const DEFAULT_POTENTIAL: f32 = 100.0;
356
357 let mut injection_requests: Vec<(String, Vec<(u32, u32, u32, f32)>)> = Vec::new();
359
360 {
361 let manager = self.connectome_manager.read();
362
363 for (cortical_id, coordinates) in stimulation_payload.iter() {
364 requested_coordinates += coordinates.len();
365 let cortical_id_typed = match CorticalID::try_from_base_64(cortical_id) {
366 Ok(id) => id,
367 Err(e) => {
368 failed_areas.push(cortical_id.clone());
369 warn!("Invalid cortical ID '{}': {}", cortical_id, e);
370 continue;
371 }
372 };
373
374 match manager.get_cortical_area(&cortical_id_typed) {
375 Some(_area) => {
376 let mut xyzp_data = Vec::new();
378
379 for coord in coordinates {
380 if coord.len() != 3 {
381 warn!(
382 "Invalid coordinate format: {:?} (expected [x, y, z])",
383 coord
384 );
385 coordinates_not_found += 1;
386 continue;
387 }
388
389 let x = coord[0] as u32;
390 let y = coord[1] as u32;
391 let z = coord[2] as u32;
392
393 xyzp_data.push((x, y, z, DEFAULT_POTENTIAL));
395 }
396
397 if !xyzp_data.is_empty() {
398 injection_requests.push((cortical_id.clone(), xyzp_data));
399 successful_areas += 1;
400 }
401 }
402 None => {
403 failed_areas.push(cortical_id.clone());
404 }
405 }
406 }
407 } for (cortical_id, xyzp_data) in injection_requests {
411 match runtime_service
412 .inject_sensory_by_coordinates(
413 &cortical_id,
414 &xyzp_data,
415 match mode {
416 ManualStimulationMode::Candidate => {
417 crate::traits::runtime_service::ManualStimulationMode::Candidate
418 }
419 ManualStimulationMode::ForceFire => {
420 crate::traits::runtime_service::ManualStimulationMode::ForceFire
421 }
422 },
423 )
424 .await
425 {
426 Ok(injected_count) => {
427 total_stimulated += injected_count;
428 if injected_count < xyzp_data.len() {
429 coordinates_not_found += xyzp_data.len() - injected_count;
430 }
431 }
432 Err(e) => {
433 error!(
434 "❌ [MANUAL-STIMULATION] Failed to inject for area {}: {}",
435 cortical_id, e
436 );
437 coordinates_not_found += xyzp_data.len();
438 }
439 }
440 }
441
442 result.insert(
443 "success".to_string(),
444 serde_json::json!(failed_areas.is_empty() && coordinates_not_found == 0),
445 );
446 result.insert(
447 "total_coordinates".to_string(),
448 serde_json::json!(total_stimulated),
449 );
450 result.insert(
451 "requested_coordinates".to_string(),
452 serde_json::json!(requested_coordinates),
453 );
454 result.insert(
455 "matched_coordinates".to_string(),
456 serde_json::json!(total_stimulated),
457 );
458 result.insert(
459 "unique_neuron_ids".to_string(),
460 serde_json::json!(total_stimulated),
461 );
462 result.insert(
463 "mode".to_string(),
464 serde_json::json!(match mode {
465 ManualStimulationMode::Candidate => "candidate",
466 ManualStimulationMode::ForceFire => "force_fire",
467 }),
468 );
469 result.insert(
470 "successful_areas".to_string(),
471 serde_json::json!(successful_areas),
472 );
473 result.insert("failed_areas".to_string(), serde_json::json!(failed_areas));
474
475 if coordinates_not_found > 0 {
476 result.insert(
477 "coordinates_not_found".to_string(),
478 serde_json::json!(coordinates_not_found),
479 );
480 }
481
482 if !failed_areas.is_empty() {
483 result.insert(
484 "error".to_string(),
485 serde_json::json!(format!("Some cortical areas not found: {:?}", failed_areas)),
486 );
487 }
488
489 Ok(result)
490 }
491
492 fn try_set_runtime_service(&self, runtime_service: Arc<dyn RuntimeServiceTrait + Send + Sync>) {
493 self.set_runtime_service(runtime_service);
494 }
495}