1use async_trait::async_trait;
10use parking_lot::RwLock;
11use std::collections::HashMap;
12use std::sync::Arc;
13use tracing::{debug, error, info, warn};
14
15use crate::traits::agent_service::*;
16use crate::traits::registration_handler::RegistrationHandlerTrait;
17use crate::traits::RuntimeService as RuntimeServiceTrait;
18use crate::types::agent_registry::AgentRegistry;
19use crate::types::registration::RegistrationRequest;
20use feagi_brain_development::ConnectomeManager;
21use feagi_structures::genomic::cortical_area::CorticalID;
22
23pub struct AgentServiceImpl {
25 connectome_manager: Arc<RwLock<ConnectomeManager>>,
26 agent_registry: Arc<RwLock<AgentRegistry>>,
27 registration_handler: Option<Arc<dyn RegistrationHandlerTrait>>,
28 runtime_service: Arc<RwLock<Option<Arc<dyn RuntimeServiceTrait + Send + Sync>>>>,
29}
30
31impl AgentServiceImpl {
32 pub fn new(
33 connectome_manager: Arc<RwLock<ConnectomeManager>>,
34 agent_registry: Arc<RwLock<AgentRegistry>>,
35 ) -> Self {
36 Self {
37 connectome_manager,
38 agent_registry,
39 registration_handler: None,
40 runtime_service: Arc::new(RwLock::new(None)),
41 }
42 }
43
44 pub fn new_with_runtime(
46 connectome_manager: Arc<RwLock<ConnectomeManager>>,
47 agent_registry: Arc<RwLock<AgentRegistry>>,
48 runtime_service: Arc<dyn RuntimeServiceTrait + Send + Sync>,
49 ) -> Self {
50 Self {
51 connectome_manager,
52 agent_registry,
53 registration_handler: None,
54 runtime_service: Arc::new(RwLock::new(Some(runtime_service))),
55 }
56 }
57
58 pub fn set_registration_handler(&mut self, handler: Arc<dyn RegistrationHandlerTrait>) {
61 self.registration_handler = Some(handler);
62 info!("🦀 [AGENT-SERVICE] Registration handler connected");
63 }
64
65 pub fn set_runtime_service(&self, runtime_service: Arc<dyn RuntimeServiceTrait + Send + Sync>) {
67 let mut guard = self.runtime_service.write();
68 if let Some(existing) = guard.as_ref() {
69 if Arc::ptr_eq(existing, &runtime_service) {
70 debug!(
71 "🦀 [AGENT-SERVICE] Runtime service already connected; ignoring duplicate bind"
72 );
73 return;
74 }
75 warn!("🦀 [AGENT-SERVICE] Runtime service replaced with a new instance");
76 } else {
77 info!("🦀 [AGENT-SERVICE] Runtime service connected");
78 }
79 *guard = Some(runtime_service);
80 }
81}
82
83#[async_trait]
84impl AgentService for AgentServiceImpl {
85 async fn register_agent(
86 &self,
87 registration: AgentRegistration,
88 ) -> AgentResult<AgentRegistrationResponse> {
89 info!(
90 "🦀 [AGENT-SERVICE] Registering agent: {} (type: {})",
91 registration.agent_id, registration.agent_type
92 );
93
94 if let Some(handler) = &self.registration_handler {
96 info!(
97 "📝 [AGENT-SERVICE] Using PNS registration handler for full transport negotiation"
98 );
99
100 let pns_request = RegistrationRequest {
102 agent_id: registration.agent_id.clone(),
103 agent_type: registration.agent_type.clone(),
104 capabilities: serde_json::to_value(®istration.capabilities)
105 .unwrap_or(serde_json::Value::Object(serde_json::Map::new())),
106 chosen_transport: registration.chosen_transport.clone(), };
108
109 let handler_clone = handler.clone();
112 let pns_response = tokio::task::spawn_blocking(move || {
113 handler_clone.process_registration(pns_request)
114 })
115 .await
116 .map_err(|e| {
117 AgentError::RegistrationFailed(format!("Registration task panicked: {:?}", e))
118 })?
119 .map_err(AgentError::RegistrationFailed)?;
120
121 let transports = pns_response.transports.map(|ts| {
123 ts.into_iter()
124 .map(|t| TransportConfig {
125 transport_type: t.transport_type,
126 enabled: t.enabled,
127 ports: t.ports,
128 host: t.host,
129 })
130 .collect()
131 });
132
133 let cortical_areas_json =
135 serde_json::to_value(&pns_response.cortical_areas).map_err(|e| {
136 error!(
137 "❌ [AGENT-SERVICE] Failed to serialize cortical areas: {}",
138 e
139 );
140 AgentError::Internal(format!("Failed to serialize cortical areas: {}", e))
141 })?;
142
143 return Ok(AgentRegistrationResponse {
144 status: pns_response.status,
145 message: pns_response
146 .message
147 .unwrap_or_else(|| "Success".to_string()),
148 success: true,
149 transport: None,
150 rates: None,
151 transports,
152 recommended_transport: pns_response.recommended_transport,
153 shm_paths: pns_response.shm_paths,
154 cortical_areas: cortical_areas_json,
155 });
156 }
157
158 return Err(AgentError::RegistrationFailed(
159 "Registration handler not available - required for FEAGI 2.0".to_string(),
160 ));
161 }
162
163 async fn heartbeat(&self, request: HeartbeatRequest) -> AgentResult<()> {
164 let agent_exists = {
166 let registry = self.agent_registry.read();
167 registry.get(&request.agent_id).is_some()
168 };
169
170 if !agent_exists {
171 let all_agents: Vec<String> = {
173 let registry = self.agent_registry.read();
174 registry
175 .get_all()
176 .iter()
177 .map(|a| a.agent_id.clone())
178 .collect()
179 };
180 warn!(
181 "⚠️ [HEARTBEAT] Agent '{}' not found in registry. Registered agents ({}): {:?}",
182 request.agent_id,
183 all_agents.len(),
184 all_agents
185 );
186 return Err(AgentError::NotFound(format!(
187 "Agent {} not found in registry (total registered: {})",
188 request.agent_id,
189 all_agents.len()
190 )));
191 }
192
193 self.agent_registry
195 .write()
196 .heartbeat(&request.agent_id)
197 .map_err(|e| {
198 error!(
199 "❌ [HEARTBEAT] Failed to update heartbeat for '{}': {}",
200 request.agent_id, e
201 );
202 AgentError::NotFound(e)
203 })?;
204 Ok(())
205 }
206
207 async fn list_agents(&self) -> AgentResult<Vec<String>> {
208 tracing::trace!(target: "feagi-services", "list_agents() called - acquiring registry read lock...");
209 let registry = self.agent_registry.read();
210 let agents = registry.get_all();
211 let agent_ids: Vec<String> = agents.iter().map(|a| a.agent_id.clone()).collect();
212
213 tracing::trace!(
214 target: "feagi-services",
215 "list_agents() found {} agents: {:?}",
216 agent_ids.len(),
217 agent_ids
218 );
219 tracing::trace!(
220 target: "feagi-services",
221 "AgentRegistry pointer: {:p}",
222 &*self.agent_registry as *const _
223 );
224
225 Ok(agent_ids)
226 }
227
228 async fn get_agent_properties(&self, agent_id: &str) -> AgentResult<AgentProperties> {
229 let registry = self.agent_registry.read();
230 let agent = registry
231 .get(agent_id)
232 .ok_or_else(|| AgentError::NotFound(format!("Agent {} not found", agent_id)))?;
233
234 let agent_ip = agent
236 .metadata
237 .get("agent_ip")
238 .and_then(|v| v.as_str())
239 .unwrap_or("127.0.0.1")
240 .to_string();
241
242 let agent_data_port = agent
243 .metadata
244 .get("agent_data_port")
245 .and_then(|v| v.as_u64())
246 .unwrap_or(0) as u16;
247
248 let agent_version = agent
249 .metadata
250 .get("agent_version")
251 .and_then(|v| v.as_str())
252 .unwrap_or("unknown")
253 .to_string();
254
255 let controller_version = agent
256 .metadata
257 .get("controller_version")
258 .and_then(|v| v.as_str())
259 .unwrap_or("unknown")
260 .to_string();
261
262 let agent_router_address = format!("tcp://{}:{}", agent_ip, agent_data_port);
263
264 let mut capabilities = HashMap::new();
266
267 if let Some(ref vision) = agent.capabilities.vision {
269 capabilities.insert(
270 "vision".to_string(),
271 serde_json::to_value(vision).unwrap_or(serde_json::Value::Null),
272 );
273 }
274
275 if let Some(ref motor) = agent.capabilities.motor {
277 let output_areas: Vec<String> = motor.source_cortical_areas.clone();
279 capabilities.insert(
280 "output".to_string(),
281 serde_json::to_value(output_areas).unwrap_or(serde_json::Value::Null),
282 );
283 }
284
285 if let Some(ref viz) = agent.capabilities.visualization {
287 capabilities.insert(
288 "visualization".to_string(),
289 serde_json::to_value(viz).unwrap_or(serde_json::Value::Null),
290 );
291 }
292
293 if let Some(ref _sensory) = agent.capabilities.sensory {
296 capabilities.insert(
299 "input".to_string(),
300 serde_json::to_value(Vec::<String>::new()).unwrap_or(serde_json::Value::Null),
301 );
302 }
303
304 for (k, v) in agent.capabilities.custom.iter() {
306 capabilities.insert(k.clone(), v.clone());
307 }
308
309 Ok(AgentProperties {
310 agent_type: agent.agent_type.to_string(),
311 agent_ip,
312 agent_data_port,
313 agent_router_address,
314 agent_version,
315 controller_version,
316 capabilities,
317 chosen_transport: agent.chosen_transport.clone(),
318 })
319 }
320
321 async fn get_shared_memory_info(
322 &self,
323 ) -> AgentResult<HashMap<String, HashMap<String, serde_json::Value>>> {
324 Ok(HashMap::new())
326 }
327
328 async fn deregister_agent(&self, agent_id: &str) -> AgentResult<()> {
329 self.agent_registry
330 .write()
331 .deregister(agent_id)
332 .map_err(AgentError::NotFound)?;
333
334 info!(
335 "✅ [AGENT-SERVICE] Agent '{}' deregistered successfully",
336 agent_id
337 );
338 Ok(())
339 }
340
341 async fn manual_stimulation(
342 &self,
343 stimulation_payload: HashMap<String, Vec<Vec<i32>>>,
344 mode: ManualStimulationMode,
345 ) -> AgentResult<HashMap<String, serde_json::Value>> {
346 let runtime_service = self
348 .runtime_service
349 .read()
350 .as_ref()
351 .ok_or_else(|| {
352 AgentError::Internal(
353 "Runtime service not available - cannot inject stimuli".to_string(),
354 )
355 })?
356 .clone();
357
358 let mut result = HashMap::new();
359 let mut total_stimulated = 0usize;
360 let mut requested_coordinates = 0usize;
361 let mut successful_areas = 0;
362 let mut failed_areas = Vec::new();
363 let mut coordinates_not_found = 0;
364
365 const DEFAULT_POTENTIAL: f32 = 100.0;
367
368 let mut injection_requests: Vec<(String, Vec<(u32, u32, u32, f32)>)> = Vec::new();
370
371 {
372 let manager = self.connectome_manager.read();
373
374 for (cortical_id, coordinates) in stimulation_payload.iter() {
375 requested_coordinates += coordinates.len();
376 let cortical_id_typed = match CorticalID::try_from_base_64(cortical_id) {
377 Ok(id) => id,
378 Err(e) => {
379 failed_areas.push(cortical_id.clone());
380 warn!("Invalid cortical ID '{}': {}", cortical_id, e);
381 continue;
382 }
383 };
384
385 match manager.get_cortical_area(&cortical_id_typed) {
386 Some(_area) => {
387 let mut xyzp_data = Vec::new();
389
390 for coord in coordinates {
391 if coord.len() != 3 {
392 warn!(
393 "Invalid coordinate format: {:?} (expected [x, y, z])",
394 coord
395 );
396 coordinates_not_found += 1;
397 continue;
398 }
399
400 let x = coord[0] as u32;
401 let y = coord[1] as u32;
402 let z = coord[2] as u32;
403
404 xyzp_data.push((x, y, z, DEFAULT_POTENTIAL));
406 }
407
408 if !xyzp_data.is_empty() {
409 injection_requests.push((cortical_id.clone(), xyzp_data));
410 successful_areas += 1;
411 }
412 }
413 None => {
414 failed_areas.push(cortical_id.clone());
415 }
416 }
417 }
418 } for (cortical_id, xyzp_data) in injection_requests {
422 match runtime_service
423 .inject_sensory_by_coordinates(
424 &cortical_id,
425 &xyzp_data,
426 match mode {
427 ManualStimulationMode::Candidate => {
428 crate::traits::runtime_service::ManualStimulationMode::Candidate
429 }
430 ManualStimulationMode::ForceFire => {
431 crate::traits::runtime_service::ManualStimulationMode::ForceFire
432 }
433 },
434 )
435 .await
436 {
437 Ok(injected_count) => {
438 total_stimulated += injected_count;
439 if injected_count < xyzp_data.len() {
440 coordinates_not_found += xyzp_data.len() - injected_count;
441 }
442 }
443 Err(e) => {
444 error!(
445 "❌ [MANUAL-STIMULATION] Failed to inject for area {}: {}",
446 cortical_id, e
447 );
448 coordinates_not_found += xyzp_data.len();
449 }
450 }
451 }
452
453 result.insert(
454 "success".to_string(),
455 serde_json::json!(failed_areas.is_empty() && coordinates_not_found == 0),
456 );
457 result.insert(
458 "total_coordinates".to_string(),
459 serde_json::json!(total_stimulated),
460 );
461 result.insert(
462 "requested_coordinates".to_string(),
463 serde_json::json!(requested_coordinates),
464 );
465 result.insert(
466 "matched_coordinates".to_string(),
467 serde_json::json!(total_stimulated),
468 );
469 result.insert(
470 "unique_neuron_ids".to_string(),
471 serde_json::json!(total_stimulated),
472 );
473 result.insert(
474 "mode".to_string(),
475 serde_json::json!(match mode {
476 ManualStimulationMode::Candidate => "candidate",
477 ManualStimulationMode::ForceFire => "force_fire",
478 }),
479 );
480 result.insert(
481 "successful_areas".to_string(),
482 serde_json::json!(successful_areas),
483 );
484 result.insert("failed_areas".to_string(), serde_json::json!(failed_areas));
485
486 if coordinates_not_found > 0 {
487 result.insert(
488 "coordinates_not_found".to_string(),
489 serde_json::json!(coordinates_not_found),
490 );
491 }
492
493 if !failed_areas.is_empty() {
494 result.insert(
495 "error".to_string(),
496 serde_json::json!(format!("Some cortical areas not found: {:?}", failed_areas)),
497 );
498 }
499
500 Ok(result)
501 }
502
503 fn try_set_runtime_service(&self, runtime_service: Arc<dyn RuntimeServiceTrait + Send + Sync>) {
504 self.set_runtime_service(runtime_service);
505 }
506}