1use async_trait::async_trait;
10use parking_lot::RwLock;
11use std::collections::HashMap;
12use std::sync::Arc;
13use tracing::{debug, error, info, warn};
14
15use crate::traits::agent_service::*;
16use crate::traits::registration_handler::RegistrationHandlerTrait;
17use crate::traits::RuntimeService as RuntimeServiceTrait;
18use crate::types::agent_registry::AgentRegistry;
19use crate::types::registration::RegistrationRequest;
20use feagi_brain_development::ConnectomeManager;
21use feagi_structures::genomic::cortical_area::CorticalID;
22
23pub struct AgentServiceImpl {
25 connectome_manager: Arc<RwLock<ConnectomeManager>>,
26 agent_registry: Arc<RwLock<AgentRegistry>>,
27 registration_handler: Option<Arc<dyn RegistrationHandlerTrait>>,
28 runtime_service: Arc<RwLock<Option<Arc<dyn RuntimeServiceTrait + Send + Sync>>>>,
29}
30
31impl AgentServiceImpl {
32 pub fn new(
33 connectome_manager: Arc<RwLock<ConnectomeManager>>,
34 agent_registry: Arc<RwLock<AgentRegistry>>,
35 ) -> Self {
36 Self {
37 connectome_manager,
38 agent_registry,
39 registration_handler: None,
40 runtime_service: Arc::new(RwLock::new(None)),
41 }
42 }
43
44 pub fn new_with_runtime(
46 connectome_manager: Arc<RwLock<ConnectomeManager>>,
47 agent_registry: Arc<RwLock<AgentRegistry>>,
48 runtime_service: Arc<dyn RuntimeServiceTrait + Send + Sync>,
49 ) -> Self {
50 Self {
51 connectome_manager,
52 agent_registry,
53 registration_handler: None,
54 runtime_service: Arc::new(RwLock::new(Some(runtime_service))),
55 }
56 }
57
58 pub fn set_registration_handler(&mut self, handler: Arc<dyn RegistrationHandlerTrait>) {
61 self.registration_handler = Some(handler);
62 info!("🦀 [AGENT-SERVICE] Registration handler connected");
63 }
64
65 pub fn set_runtime_service(&self, runtime_service: Arc<dyn RuntimeServiceTrait + Send + Sync>) {
67 let mut guard = self.runtime_service.write();
68 if let Some(existing) = guard.as_ref() {
69 if Arc::ptr_eq(existing, &runtime_service) {
70 debug!(
71 "🦀 [AGENT-SERVICE] Runtime service already connected; ignoring duplicate bind"
72 );
73 return;
74 }
75 warn!("🦀 [AGENT-SERVICE] Runtime service replaced with a new instance");
76 } else {
77 info!("🦀 [AGENT-SERVICE] Runtime service connected");
78 }
79 *guard = Some(runtime_service);
80 }
81}
82
83#[async_trait]
84impl AgentService for AgentServiceImpl {
85 async fn register_agent(
86 &self,
87 registration: AgentRegistration,
88 ) -> AgentResult<AgentRegistrationResponse> {
89 info!(
90 "🦀 [AGENT-SERVICE] Registering agent: {} (type: {})",
91 registration.agent_id, registration.agent_type
92 );
93
94 if let Some(handler) = &self.registration_handler {
96 info!(
97 "📝 [AGENT-SERVICE] Using PNS registration handler for full transport negotiation"
98 );
99
100 let pns_request = RegistrationRequest {
102 agent_id: registration.agent_id.clone(),
103 agent_type: registration.agent_type.clone(),
104 capabilities: serde_json::to_value(®istration.capabilities)
105 .unwrap_or(serde_json::Value::Object(serde_json::Map::new())),
106 chosen_transport: registration.chosen_transport.clone(), };
108
109 let handler_clone = handler.clone();
112 let pns_response = tokio::task::spawn_blocking(move || {
113 handler_clone.process_registration(pns_request)
114 })
115 .await
116 .map_err(|e| {
117 AgentError::RegistrationFailed(format!("Registration task panicked: {:?}", e))
118 })?
119 .map_err(AgentError::RegistrationFailed)?;
120
121 let transports = pns_response.transports.map(|ts| {
123 ts.into_iter()
124 .map(|t| TransportConfig {
125 transport_type: t.transport_type,
126 enabled: t.enabled,
127 ports: t.ports,
128 host: t.host,
129 })
130 .collect()
131 });
132
133 let cortical_areas_json =
135 serde_json::to_value(&pns_response.cortical_areas).map_err(|e| {
136 error!(
137 "❌ [AGENT-SERVICE] Failed to serialize cortical areas: {}",
138 e
139 );
140 AgentError::Internal(format!("Failed to serialize cortical areas: {}", e))
141 })?;
142
143 return Ok(AgentRegistrationResponse {
144 status: pns_response.status,
145 message: pns_response
146 .message
147 .unwrap_or_else(|| "Success".to_string()),
148 success: true,
149 transport: None,
150 rates: None,
151 transports,
152 recommended_transport: pns_response.recommended_transport,
153 shm_paths: pns_response.shm_paths,
154 cortical_areas: cortical_areas_json,
155 });
156 }
157
158 return Err(AgentError::RegistrationFailed(
159 "Registration handler not available - required for FEAGI 2.0".to_string(),
160 ));
161 }
162
163 async fn heartbeat(&self, request: HeartbeatRequest) -> AgentResult<()> {
164 let agent_exists = {
166 let registry = self.agent_registry.read();
167 registry.get(&request.agent_id).is_some()
168 };
169
170 if !agent_exists {
171 let all_agents: Vec<String> = {
173 let registry = self.agent_registry.read();
174 registry
175 .get_all()
176 .iter()
177 .map(|a| a.agent_id.clone())
178 .collect()
179 };
180 warn!(
181 "⚠️ [HEARTBEAT] Agent '{}' not found in registry. Registered agents ({}): {:?}",
182 request.agent_id,
183 all_agents.len(),
184 all_agents
185 );
186 return Err(AgentError::NotFound(format!(
187 "Agent {} not found in registry (total registered: {})",
188 request.agent_id,
189 all_agents.len()
190 )));
191 }
192
193 self.agent_registry
195 .write()
196 .heartbeat(&request.agent_id)
197 .map_err(|e| {
198 error!(
199 "❌ [HEARTBEAT] Failed to update heartbeat for '{}': {}",
200 request.agent_id, e
201 );
202 AgentError::NotFound(e)
203 })?;
204 Ok(())
205 }
206
207 async fn list_agents(&self) -> AgentResult<Vec<String>> {
208 tracing::trace!(target: "feagi-services", "list_agents() called - acquiring registry read lock...");
209 let registry = self.agent_registry.read();
210 let agents = registry.get_all();
211 let agent_ids: Vec<String> = agents.iter().map(|a| a.agent_id.clone()).collect();
212
213 tracing::trace!(
214 target: "feagi-services",
215 "list_agents() found {} agents: {:?}",
216 agent_ids.len(),
217 agent_ids
218 );
219 tracing::trace!(
220 target: "feagi-services",
221 "AgentRegistry pointer: {:p}",
222 &*self.agent_registry as *const _
223 );
224
225 Ok(agent_ids)
226 }
227
228 async fn get_agent_properties(&self, agent_id: &str) -> AgentResult<AgentProperties> {
229 let registry = self.agent_registry.read();
230 let agent = registry
231 .get(agent_id)
232 .ok_or_else(|| AgentError::NotFound(format!("Agent {} not found", agent_id)))?;
233
234 let agent_ip = agent
236 .metadata
237 .get("agent_ip")
238 .and_then(|v| v.as_str())
239 .unwrap_or("127.0.0.1")
240 .to_string();
241
242 let agent_data_port = agent
243 .metadata
244 .get("agent_data_port")
245 .and_then(|v| v.as_u64())
246 .unwrap_or(0) as u16;
247
248 let agent_version = agent
249 .metadata
250 .get("agent_version")
251 .and_then(|v| v.as_str())
252 .unwrap_or("unknown")
253 .to_string();
254
255 let controller_version = agent
256 .metadata
257 .get("controller_version")
258 .and_then(|v| v.as_str())
259 .unwrap_or("unknown")
260 .to_string();
261
262 let agent_router_address = format!("tcp://{}:{}", agent_ip, agent_data_port);
263
264 let mut capabilities = HashMap::new();
266
267 if let Some(ref vision) = agent.capabilities.vision {
269 capabilities.insert(
270 "vision".to_string(),
271 serde_json::to_value(vision).unwrap_or(serde_json::Value::Null),
272 );
273 }
274
275 if let Some(ref motor) = agent.capabilities.motor {
277 let output_areas: Vec<String> = motor.source_cortical_areas.clone();
279 capabilities.insert(
280 "output".to_string(),
281 serde_json::to_value(output_areas).unwrap_or(serde_json::Value::Null),
282 );
283 }
284
285 if let Some(ref viz) = agent.capabilities.visualization {
287 capabilities.insert(
288 "visualization".to_string(),
289 serde_json::to_value(viz).unwrap_or(serde_json::Value::Null),
290 );
291 }
292
293 if let Some(ref _sensory) = agent.capabilities.sensory {
296 capabilities.insert(
299 "input".to_string(),
300 serde_json::to_value(Vec::<String>::new()).unwrap_or(serde_json::Value::Null),
301 );
302 }
303
304 for (k, v) in agent.capabilities.custom.iter() {
306 capabilities.insert(k.clone(), v.clone());
307 }
308
309 Ok(AgentProperties {
310 agent_type: agent.agent_type.to_string(),
311 agent_ip,
312 agent_data_port,
313 agent_router_address,
314 agent_version,
315 controller_version,
316 capabilities,
317 chosen_transport: agent.chosen_transport.clone(),
318 })
319 }
320
321 async fn get_shared_memory_info(
322 &self,
323 ) -> AgentResult<HashMap<String, HashMap<String, serde_json::Value>>> {
324 Ok(HashMap::new())
326 }
327
328 async fn deregister_agent(&self, agent_id: &str) -> AgentResult<()> {
329 self.agent_registry
330 .write()
331 .deregister(agent_id)
332 .map_err(AgentError::NotFound)?;
333
334 info!(
335 "✅ [AGENT-SERVICE] Agent '{}' deregistered successfully",
336 agent_id
337 );
338 Ok(())
339 }
340
341 async fn manual_stimulation(
342 &self,
343 stimulation_payload: HashMap<String, Vec<Vec<i32>>>,
344 mode: ManualStimulationMode,
345 ) -> AgentResult<HashMap<String, serde_json::Value>> {
346 let _ = mode;
350
351 let runtime_service = self
353 .runtime_service
354 .read()
355 .as_ref()
356 .ok_or_else(|| {
357 AgentError::Internal(
358 "Runtime service not available - cannot inject stimuli".to_string(),
359 )
360 })?
361 .clone();
362
363 let mut result = HashMap::new();
364 let mut total_stimulated = 0usize;
365 let mut requested_coordinates = 0usize;
366 let mut successful_areas = 0;
367 let mut failed_areas = Vec::new();
368 let mut coordinates_not_found = 0;
369
370 const DEFAULT_POTENTIAL: f32 = 100.0;
372
373 let mut injection_requests: Vec<(String, Vec<(u32, u32, u32, f32)>)> = Vec::new();
375
376 {
377 let manager = self.connectome_manager.read();
378
379 for (cortical_id, coordinates) in stimulation_payload.iter() {
380 requested_coordinates += coordinates.len();
381 let cortical_id_typed = match CorticalID::try_from_base_64(cortical_id) {
382 Ok(id) => id,
383 Err(e) => {
384 failed_areas.push(cortical_id.clone());
385 warn!("Invalid cortical ID '{}': {}", cortical_id, e);
386 continue;
387 }
388 };
389
390 match manager.get_cortical_area(&cortical_id_typed) {
391 Some(_area) => {
392 let mut xyzp_data = Vec::new();
394
395 for coord in coordinates {
396 if coord.len() != 3 {
397 warn!(
398 "Invalid coordinate format: {:?} (expected [x, y, z])",
399 coord
400 );
401 coordinates_not_found += 1;
402 continue;
403 }
404
405 let x = coord[0] as u32;
406 let y = coord[1] as u32;
407 let z = coord[2] as u32;
408
409 xyzp_data.push((x, y, z, DEFAULT_POTENTIAL));
411 }
412
413 if !xyzp_data.is_empty() {
414 injection_requests.push((cortical_id.clone(), xyzp_data));
415 successful_areas += 1;
416 }
417 }
418 None => {
419 failed_areas.push(cortical_id.clone());
420 }
421 }
422 }
423 } for (cortical_id, xyzp_data) in injection_requests {
427 match runtime_service
428 .inject_sensory_by_coordinates(
429 &cortical_id,
430 &xyzp_data,
431 crate::traits::runtime_service::ManualStimulationMode::ForceFire,
432 )
433 .await
434 {
435 Ok(injected_count) => {
436 total_stimulated += injected_count;
437 if injected_count < xyzp_data.len() {
438 coordinates_not_found += xyzp_data.len() - injected_count;
439 }
440 }
441 Err(e) => {
442 error!(
443 "❌ [MANUAL-STIMULATION] Failed to inject for area {}: {}",
444 cortical_id, e
445 );
446 coordinates_not_found += xyzp_data.len();
447 }
448 }
449 }
450
451 result.insert(
452 "success".to_string(),
453 serde_json::json!(failed_areas.is_empty() && coordinates_not_found == 0),
454 );
455 result.insert(
456 "total_coordinates".to_string(),
457 serde_json::json!(total_stimulated),
458 );
459 result.insert(
460 "requested_coordinates".to_string(),
461 serde_json::json!(requested_coordinates),
462 );
463 result.insert(
464 "matched_coordinates".to_string(),
465 serde_json::json!(total_stimulated),
466 );
467 result.insert(
468 "unique_neuron_ids".to_string(),
469 serde_json::json!(total_stimulated),
470 );
471 result.insert("mode".to_string(), serde_json::json!("force_fire"));
472 result.insert(
473 "successful_areas".to_string(),
474 serde_json::json!(successful_areas),
475 );
476 result.insert("failed_areas".to_string(), serde_json::json!(failed_areas));
477
478 if coordinates_not_found > 0 {
479 result.insert(
480 "coordinates_not_found".to_string(),
481 serde_json::json!(coordinates_not_found),
482 );
483 }
484
485 if !failed_areas.is_empty() {
486 result.insert(
487 "error".to_string(),
488 serde_json::json!(format!("Some cortical areas not found: {:?}", failed_areas)),
489 );
490 }
491
492 Ok(result)
493 }
494
495 fn try_set_runtime_service(&self, runtime_service: Arc<dyn RuntimeServiceTrait + Send + Sync>) {
496 self.set_runtime_service(runtime_service);
497 }
498}