1use crate::core::config::AgentConfig;
7use crate::core::error::{Result, SdkError};
8use crate::core::heartbeat::HeartbeatService;
9use crate::core::reconnect::{retry_with_backoff, ReconnectionStrategy};
10use feagi_io::AgentType;
11use futures::FutureExt;
12use std::future::Future;
13use std::net::{TcpStream, ToSocketAddrs};
14use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
15use std::sync::{Arc, Mutex};
16use std::time::Duration;
17use std::time::SystemTime;
18use tokio::runtime::Handle;
19use tokio::runtime::Runtime;
20use tokio::task::block_in_place;
21use tokio::time::timeout;
22use tracing::{debug, error, info, trace, warn};
23use zeromq::{
24 DealerSocket, PushSocket, Socket, SocketRecv, SocketSend, SubSocket, ZmqError, ZmqMessage,
25};
26
27fn block_on_with<T>(
28 handle: &Handle,
29 runtime: Option<&Runtime>,
30 future: impl Future<Output = T>,
31) -> T {
32 if Handle::try_current().is_ok() {
33 block_in_place(|| handle.block_on(future))
34 } else if let Some(runtime) = runtime {
35 runtime.block_on(future)
36 } else {
37 handle.block_on(future)
38 }
39}
40
41pub struct AgentClient {
67 config: AgentConfig,
69
70 runtime_handle: Handle,
72
73 runtime: Option<Arc<Runtime>>,
75
76 registration_socket: Option<Arc<Mutex<DealerSocket>>>,
78
79 sensory_socket: Option<Arc<Mutex<PushSocket>>>,
81
82 motor_socket: Option<Arc<Mutex<SubSocket>>>,
84
85 viz_socket: Option<Arc<Mutex<SubSocket>>>,
87
88 control_socket: Option<Arc<Mutex<DealerSocket>>>,
90
91 heartbeat: Option<HeartbeatService>,
93
94 registered: bool,
96
97 last_reconnect_attempt_ms: AtomicU64,
99
100 sensory_connected: AtomicBool,
102
103 last_registration_body: Option<serde_json::Value>,
111}
112
113impl AgentClient {
114 pub fn new(config: AgentConfig) -> Result<Self> {
120 config.validate()?;
122
123 let (runtime_handle, runtime) = if let Ok(handle) = Handle::try_current() {
124 (handle, None)
125 } else {
126 let runtime = Runtime::new()
127 .map_err(|e| SdkError::Other(format!("Failed to create runtime: {}", e)))?;
128 let handle = runtime.handle().clone();
129 (handle, Some(Arc::new(runtime)))
130 };
131
132 Ok(Self {
133 config,
134 runtime_handle,
135 runtime,
136 registration_socket: None,
137 sensory_socket: None,
138 motor_socket: None,
139 viz_socket: None,
140 control_socket: None,
141 heartbeat: None,
142 registered: false,
143 last_registration_body: None,
144 last_reconnect_attempt_ms: AtomicU64::new(0),
145 sensory_connected: AtomicBool::new(false),
146 })
147 }
148
149 pub fn registration_body_json(&self) -> Option<&serde_json::Value> {
153 self.last_registration_body.as_ref()
154 }
155
156 pub fn connect(&mut self) -> Result<()> {
172 if self.registered {
173 return Err(SdkError::AlreadyConnected);
174 }
175
176 info!(
177 "[CLIENT] Step 0: starting connection sequence to FEAGI: {}",
178 self.config.registration_endpoint
179 );
180
181 info!("[CLIENT] Step 1: creating control sockets (registration)...");
183 let mut socket_strategy = ReconnectionStrategy::new(
184 self.config.retry_backoff_ms,
185 self.config.registration_retries,
186 );
187 retry_with_backoff(
188 || self.create_sockets(),
189 &mut socket_strategy,
190 "Socket creation",
191 )?;
192 info!("[CLIENT] Step 1: control sockets created");
193
194 info!("[CLIENT] Step 2: registering with FEAGI...");
196 let mut reg_strategy = ReconnectionStrategy::new(
197 self.config.retry_backoff_ms,
198 self.config.registration_retries,
199 );
200 retry_with_backoff(|| self.register(), &mut reg_strategy, "Registration")?;
201 info!("[CLIENT] Step 2: registration successful");
202
203 info!("[CLIENT] Step 3: connecting to data streams with retry...");
207 info!(
208 "[CLIENT] sensory endpoint: {}",
209 self.config.sensory_endpoint
210 );
211 if matches!(self.config.agent_type, AgentType::Motor | AgentType::Both) {
212 info!("[CLIENT] motor endpoint: {}", self.config.motor_endpoint);
213 }
214 let mut data_socket_strategy = ReconnectionStrategy::new(
215 self.config.retry_backoff_ms,
216 self.config.registration_retries,
217 );
218 retry_with_backoff(
219 || self.connect_data_sockets(),
220 &mut data_socket_strategy,
221 "Data socket creation",
222 )?;
223 info!("[CLIENT] Step 3: data sockets connected");
224
225 info!("[CLIENT] Step 4: starting heartbeat service...");
227 if self.config.heartbeat_interval > 0.0 {
228 self.start_heartbeat()?;
229 info!("[CLIENT] Step 4: heartbeat service started");
230 } else {
231 info!("[CLIENT] Step 4: heartbeat disabled (interval = 0)");
232 }
233
234 info!(
235 "[CLIENT] fully connected to FEAGI as agent: {}",
236 self.config.agent_id
237 );
238 Ok(())
239 }
240
241 fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
242 block_on_with(&self.runtime_handle, self.runtime.as_deref(), future)
243 }
244
245 fn create_sockets(&mut self) -> Result<()> {
247 let mut reg_socket = DealerSocket::new();
249 let reg_endpoint = self.config.registration_endpoint.clone();
250 self.block_on(reg_socket.connect(®_endpoint))
251 .map_err(|e: ZmqError| {
252 SdkError::Other(format!("registration endpoint {}: {}", reg_endpoint, e))
253 })?;
254 self.registration_socket = Some(Arc::new(Mutex::new(reg_socket)));
255
256 if matches!(self.config.agent_type, AgentType::Infrastructure) {
258 let mut control_socket = DealerSocket::new();
259 self.block_on(control_socket.connect(&self.config.control_endpoint))
260 .map_err(SdkError::Zmq)?;
261 self.control_socket = Some(Arc::new(Mutex::new(control_socket)));
262 debug!("[CLIENT] ✓ Control/API socket created");
263 }
264
265 debug!("[CLIENT] ✓ ZMQ control sockets created");
266 Ok(())
267 }
268
269 fn connect_data_sockets(&mut self) -> Result<()> {
271 self.sensory_connected.store(false, Ordering::Relaxed);
273 self.wait_for_tcp_endpoint("sensory", &self.config.sensory_endpoint)?;
274 let mut sensory_socket = PushSocket::new();
275 let sensory_endpoint = self.config.sensory_endpoint.clone();
276 self.block_on(sensory_socket.connect(&sensory_endpoint))
277 .map_err(|e: ZmqError| {
278 SdkError::Other(format!("sensory endpoint {}: {}", sensory_endpoint, e))
279 })?;
280 self.sensory_socket = Some(Arc::new(Mutex::new(sensory_socket)));
281
282 if matches!(self.config.agent_type, AgentType::Motor | AgentType::Both) {
284 info!(
285 "[SDK-CONNECT] 🎮 Initializing motor socket for agent '{}' (type: {:?})",
286 self.config.agent_id, self.config.agent_type
287 );
288 info!(
289 "[SDK-CONNECT] 🎮 Motor endpoint: {}",
290 self.config.motor_endpoint
291 );
292
293 self.wait_for_tcp_endpoint("motor", &self.config.motor_endpoint)?;
294 let mut motor_socket = SubSocket::new();
295 let motor_endpoint = self.config.motor_endpoint.clone();
296 self.block_on(motor_socket.connect(&motor_endpoint))
297 .map_err(|e: ZmqError| {
298 SdkError::Other(format!("motor endpoint {}: {}", motor_endpoint, e))
299 })?;
300 info!("[SDK-CONNECT] ✅ Motor socket connected");
301
302 info!("[SDK-CONNECT] 🎮 Subscribing to all motor topics");
312 self.block_on(motor_socket.subscribe(""))
313 .map_err(SdkError::Zmq)?;
314 info!("[SDK-CONNECT] ✅ Motor subscription set (all topics)");
315
316 self.motor_socket = Some(Arc::new(Mutex::new(motor_socket)));
317 info!("[SDK-CONNECT] ✅ Motor socket initialized successfully");
318 } else {
319 info!(
320 "[SDK-CONNECT] ⚠️ Motor socket NOT initialized (agent type: {:?})",
321 self.config.agent_type
322 );
323 }
324
325 if matches!(
327 self.config.agent_type,
328 AgentType::Visualization | AgentType::Infrastructure
329 ) {
330 self.wait_for_tcp_endpoint("visualization", &self.config.visualization_endpoint)?;
331 let mut viz_socket = SubSocket::new();
332 let viz_endpoint = self.config.visualization_endpoint.clone();
333 self.block_on(viz_socket.connect(&viz_endpoint))
334 .map_err(|e: ZmqError| {
335 SdkError::Other(format!("visualization endpoint {}: {}", viz_endpoint, e))
336 })?;
337
338 self.block_on(viz_socket.subscribe(""))
340 .map_err(SdkError::Zmq)?;
341 self.viz_socket = Some(Arc::new(Mutex::new(viz_socket)));
342 debug!("[CLIENT] ✓ Visualization socket created");
343 }
344
345 debug!("[CLIENT] ✓ ZMQ data sockets created");
346 Ok(())
347 }
348
349 pub fn reconnect_data_streams(&mut self) -> Result<()> {
351 if !self.registered {
352 return Err(SdkError::NotRegistered);
353 }
354
355 let now_ms = SystemTime::now()
356 .duration_since(SystemTime::UNIX_EPOCH)
357 .unwrap_or_default()
358 .as_millis() as u64;
359 let last_ms = self.last_reconnect_attempt_ms.load(Ordering::Relaxed);
360 let min_interval_ms = self.config.retry_backoff_ms.max(1);
361 if now_ms.saturating_sub(last_ms) < min_interval_ms {
362 return Ok(());
363 }
364 self.last_reconnect_attempt_ms
365 .store(now_ms, Ordering::Relaxed);
366
367 self.close_data_sockets();
368
369 let mut data_socket_strategy = ReconnectionStrategy::new(
370 self.config.retry_backoff_ms,
371 self.config.registration_retries,
372 );
373 retry_with_backoff(
374 || self.connect_data_sockets(),
375 &mut data_socket_strategy,
376 "Data socket reconnection",
377 )
378 }
379
380 fn close_data_sockets(&mut self) {
381 self.sensory_connected.store(false, Ordering::Relaxed);
382 if let Some(socket) = self.sensory_socket.take() {
383 if let Ok(socket) = Arc::try_unwrap(socket) {
384 match socket.into_inner() {
385 Ok(socket) => {
386 let _ = self.block_on(socket.close());
387 }
388 Err(poisoned) => {
389 let _ = self.block_on(poisoned.into_inner().close());
390 }
391 }
392 }
393 }
394 if let Some(socket) = self.motor_socket.take() {
395 if let Ok(socket) = Arc::try_unwrap(socket) {
396 match socket.into_inner() {
397 Ok(socket) => {
398 let _ = self.block_on(socket.close());
399 }
400 Err(poisoned) => {
401 let _ = self.block_on(poisoned.into_inner().close());
402 }
403 }
404 }
405 }
406 if let Some(socket) = self.viz_socket.take() {
407 if let Ok(socket) = Arc::try_unwrap(socket) {
408 match socket.into_inner() {
409 Ok(socket) => {
410 let _ = self.block_on(socket.close());
411 }
412 Err(poisoned) => {
413 let _ = self.block_on(poisoned.into_inner().close());
414 }
415 }
416 }
417 }
418 }
419
420 fn wait_for_tcp_endpoint(&self, label: &str, endpoint: &str) -> Result<()> {
421 let address = endpoint
422 .strip_prefix("tcp://")
423 .ok_or_else(|| {
424 SdkError::InvalidConfig(format!("Invalid {} endpoint: {}", label, endpoint))
425 })?
426 .to_string();
427 let timeout = Duration::from_millis(self.config.connection_timeout_ms);
428
429 info!("[CLIENT] checking {} endpoint: {}", label, endpoint);
430 let mut strategy = ReconnectionStrategy::new(
431 self.config.retry_backoff_ms,
432 self.config.registration_retries,
433 );
434 retry_with_backoff(
435 || {
436 let mut addrs = address.to_socket_addrs().map_err(|e| {
437 SdkError::InvalidConfig(format!(
438 "Failed to resolve {} endpoint {}: {}",
439 label, endpoint, e
440 ))
441 })?;
442 let addr = addrs.next().ok_or_else(|| {
443 SdkError::InvalidConfig(format!(
444 "No resolved addresses for {} endpoint {}",
445 label, endpoint
446 ))
447 })?;
448 TcpStream::connect_timeout(&addr, timeout).map_err(|e| {
449 SdkError::Timeout(format!(
450 "{} endpoint {} not reachable: {}",
451 label, endpoint, e
452 ))
453 })?;
454 info!("[CLIENT] ✅ {} endpoint is reachable", label);
455 Ok(())
456 },
457 &mut strategy,
458 &format!("{} endpoint availability", label),
459 )
460 }
461
462 fn send_request(
463 &self,
464 socket: &Arc<Mutex<DealerSocket>>,
465 request: &serde_json::Value,
466 ) -> Result<serde_json::Value> {
467 let mut socket = socket
468 .lock()
469 .map_err(|e| SdkError::ThreadError(format!("Failed to lock socket: {}", e)))?;
470
471 let message = ZmqMessage::from(request.to_string().into_bytes());
472 self.block_on(socket.send(message)).map_err(SdkError::Zmq)?;
473
474 let timeout_ms = self.config.connection_timeout_ms;
475 let recv_result = if timeout_ms > 0 {
476 self.block_on(async {
477 timeout(std::time::Duration::from_millis(timeout_ms), socket.recv()).await
478 })
479 .map_err(|_| SdkError::Timeout(format!("Request timed out after {}ms", timeout_ms)))?
480 .map_err(SdkError::Zmq)?
481 } else {
482 self.block_on(socket.recv()).map_err(SdkError::Zmq)?
483 };
484
485 let frames = recv_result.into_vec();
486 let payload = frames
487 .last()
488 .ok_or_else(|| SdkError::Other("Response was empty".to_string()))?;
489 let response: serde_json::Value = serde_json::from_slice(payload)?;
490 Ok(response)
491 }
492
493 fn register(&mut self) -> Result<()> {
495 let registration_msg = serde_json::json!({
496 "method": "POST",
497 "path": "/v1/agent/register",
498 "body": {
499 "agent_id": self.config.agent_id,
500 "agent_type": match self.config.agent_type {
501 AgentType::Sensory => "sensory",
502 AgentType::Motor => "motor",
503 AgentType::Both => "both",
504 AgentType::Visualization => "visualization",
505 AgentType::Infrastructure => "infrastructure",
506 },
507 "capabilities": self.config.capabilities,
508 }
509 });
510
511 let socket = self
512 .registration_socket
513 .as_ref()
514 .ok_or_else(|| SdkError::Other("Registration socket not initialized".to_string()))?;
515
516 debug!(
517 "[CLIENT] Sending registration request for: {}",
518 self.config.agent_id
519 );
520 let response = self.send_request(socket, ®istration_msg)?;
521
522 let status_code = response
524 .get("status")
525 .and_then(|s| s.as_u64())
526 .unwrap_or(500);
527 if status_code == 200 {
528 self.registered = true;
529 let empty_body = serde_json::json!({});
531 let body = response.get("body").unwrap_or(&empty_body);
532 self.last_registration_body = Some(body.clone());
533 info!("[CLIENT] ✓ Registration successful: {:?}", response);
534 Ok(())
535 } else {
536 let empty_body = serde_json::json!({});
537 let body = response.get("body").unwrap_or(&empty_body);
538 let message = body
539 .get("error")
540 .and_then(|m| m.as_str())
541 .unwrap_or("Unknown error");
542 self.last_registration_body = None;
544
545 if message.contains("already registered") {
547 warn!("[CLIENT] ⚠ Agent already registered - attempting deregistration and retry");
548 self.deregister()?;
549
550 info!("[CLIENT] Retrying registration after deregistration...");
552 std::thread::sleep(std::time::Duration::from_millis(100)); self.register_with_retry_once()
556 } else {
557 error!("[CLIENT] ✗ Registration failed: {}", message);
558 Err(SdkError::RegistrationFailed(message.to_string()))
559 }
560 }
561 }
562
563 fn register_with_retry_once(&mut self) -> Result<()> {
565 let registration_msg = serde_json::json!({
566 "method": "POST",
567 "path": "/v1/agent/register",
568 "body": {
569 "agent_id": self.config.agent_id,
570 "agent_type": match self.config.agent_type {
571 AgentType::Sensory => "sensory",
572 AgentType::Motor => "motor",
573 AgentType::Both => "both",
574 AgentType::Visualization => "visualization",
575 AgentType::Infrastructure => "infrastructure",
576 },
577 "capabilities": self.config.capabilities,
578 }
579 });
580
581 let socket = self
582 .registration_socket
583 .as_ref()
584 .ok_or_else(|| SdkError::Other("Registration socket not initialized".to_string()))?;
585
586 debug!(
587 "[CLIENT] Sending registration request (retry) for: {}",
588 self.config.agent_id
589 );
590 let response = self.send_request(socket, ®istration_msg)?;
591
592 let status_code = response
594 .get("status")
595 .and_then(|s| s.as_u64())
596 .unwrap_or(500);
597 if status_code == 200 {
598 self.registered = true;
599 let empty_body = serde_json::json!({});
601 let body = response.get("body").unwrap_or(&empty_body);
602 self.last_registration_body = Some(body.clone());
603 info!(
604 "[CLIENT] ✓ Registration successful (after retry): {:?}",
605 response
606 );
607 Ok(())
608 } else {
609 let empty_body = serde_json::json!({});
610 let body = response.get("body").unwrap_or(&empty_body);
611 let message = body
612 .get("error")
613 .and_then(|m| m.as_str())
614 .unwrap_or("Unknown error");
615 self.last_registration_body = None;
616 error!("[CLIENT] ✗ Registration retry failed: {}", message);
617 Err(SdkError::RegistrationFailed(message.to_string()))
618 }
619 }
620
621 fn deregister(&mut self) -> Result<()> {
623 if !self.registered && self.registration_socket.is_none() {
624 return Ok(()); }
626
627 info!("[CLIENT] Deregistering agent: {}", self.config.agent_id);
628
629 let deregistration_msg = serde_json::json!({
630 "method": "DELETE",
631 "path": "/v1/agent/deregister",
632 "body": {
633 "agent_id": self.config.agent_id,
634 }
635 });
636
637 if let Some(socket) = &self.registration_socket {
638 match self.send_request(socket, &deregistration_msg) {
639 Ok(response) => {
640 if response.get("status").and_then(|s| s.as_u64()) == Some(200) {
641 info!("[CLIENT] ✓ Deregistration successful");
642 } else {
643 warn!("[CLIENT] ⚠ Deregistration returned: {:?}", response);
644 }
645 }
646 Err(e) => {
647 warn!("[CLIENT] ⚠ Deregistration timeout/error: {}", e);
648 }
649 };
650 }
651
652 self.registered = false;
653 Ok(())
654 }
655
656 fn start_heartbeat(&mut self) -> Result<()> {
658 if self.heartbeat.is_some() {
659 return Ok(());
660 }
661
662 let socket = self
663 .registration_socket
664 .as_ref()
665 .ok_or_else(|| SdkError::Other("Registration socket not initialized".to_string()))?;
666
667 let agent_type = match self.config.agent_type {
668 AgentType::Sensory => "sensory",
669 AgentType::Motor => "motor",
670 AgentType::Both => "both",
671 AgentType::Visualization => "visualization",
672 AgentType::Infrastructure => "infrastructure",
673 }
674 .to_string();
675 let capabilities = serde_json::to_value(&self.config.capabilities)
676 .map_err(|e| SdkError::Other(format!("Failed to serialize capabilities: {e}")))?;
677
678 let reconnect_spec = crate::core::heartbeat::ReconnectSpec {
679 agent_id: self.config.agent_id.clone(),
680 agent_type,
681 capabilities,
682 registration_retries: self.config.registration_retries,
683 retry_backoff_ms: self.config.retry_backoff_ms,
684 };
685
686 let mut heartbeat = HeartbeatService::new(
687 self.config.agent_id.clone(),
688 Arc::clone(socket),
689 self.runtime_handle.clone(),
690 self.runtime.clone(),
691 self.config.heartbeat_interval,
692 )
693 .with_reconnect_spec(reconnect_spec);
694
695 heartbeat.start()?;
696 self.heartbeat = Some(heartbeat);
697
698 debug!(
699 "[CLIENT] ✓ Heartbeat service started (interval: {}s)",
700 self.config.heartbeat_interval
701 );
702 Ok(())
703 }
704
705 pub fn send_sensory_data(&self, neuron_pairs: Vec<(i32, f64)>) -> Result<()> {
719 if !self.registered {
720 return Err(SdkError::NotRegistered);
721 }
722
723 let socket = self
724 .sensory_socket
725 .as_ref()
726 .ok_or_else(|| SdkError::Other("Sensory socket not initialized".to_string()))?;
727
728 use feagi_structures::genomic::cortical_area::CorticalID;
731 use feagi_structures::neuron_voxels::xyzp::{
732 CorticalMappedXYZPNeuronVoxels, NeuronVoxelXYZPArrays,
733 };
734
735 let vision_cap = self
737 .config
738 .capabilities
739 .vision
740 .as_ref()
741 .ok_or_else(|| SdkError::Other("No vision capability configured".to_string()))?;
742
743 let (width, _height) = vision_cap.dimensions;
744
745 let cortical_id = if let (Some(unit), Some(group_index)) =
747 (vision_cap.unit, vision_cap.group)
748 {
749 use feagi_structures::genomic::cortical_area::io_cortical_area_configuration_flag::PercentageNeuronPositioning;
750 use feagi_structures::genomic::SensoryCorticalUnit;
751
752 let group: feagi_structures::genomic::cortical_area::descriptors::CorticalUnitIndex =
753 group_index.into();
754 let frame_change_handling = feagi_structures::genomic::cortical_area::io_cortical_area_configuration_flag::FrameChangeHandling::Absolute;
755 let percentage_neuron_positioning = PercentageNeuronPositioning::Linear;
756
757 let sensory_unit = match unit {
758 feagi_io::SensoryUnit::Infrared => SensoryCorticalUnit::Infrared,
759 feagi_io::SensoryUnit::Proximity => SensoryCorticalUnit::Proximity,
760 feagi_io::SensoryUnit::Shock => SensoryCorticalUnit::Shock,
761 feagi_io::SensoryUnit::Battery => SensoryCorticalUnit::Battery,
762 feagi_io::SensoryUnit::Servo => SensoryCorticalUnit::Servo,
763 feagi_io::SensoryUnit::AnalogGpio => SensoryCorticalUnit::AnalogGPIO,
764 feagi_io::SensoryUnit::DigitalGpio => SensoryCorticalUnit::DigitalGPIO,
765 feagi_io::SensoryUnit::MiscData => SensoryCorticalUnit::MiscData,
766 feagi_io::SensoryUnit::TextEnglishInput => SensoryCorticalUnit::TextEnglishInput,
767 feagi_io::SensoryUnit::CountInput => SensoryCorticalUnit::CountInput,
768 feagi_io::SensoryUnit::Vision => SensoryCorticalUnit::Vision,
769 feagi_io::SensoryUnit::SegmentedVision => SensoryCorticalUnit::SegmentedVision,
770 feagi_io::SensoryUnit::Accelerometer => SensoryCorticalUnit::Accelerometer,
771 feagi_io::SensoryUnit::Gyroscope => SensoryCorticalUnit::Gyroscope,
772 };
773
774 match sensory_unit {
777 SensoryCorticalUnit::Infrared => {
778 SensoryCorticalUnit::get_cortical_ids_array_for_infrared_with_parameters(
779 frame_change_handling,
780 percentage_neuron_positioning,
781 group,
782 )[0]
783 }
784 SensoryCorticalUnit::Proximity => {
785 SensoryCorticalUnit::get_cortical_ids_array_for_proximity_with_parameters(
786 frame_change_handling,
787 percentage_neuron_positioning,
788 group,
789 )[0]
790 }
791 SensoryCorticalUnit::Shock => {
792 SensoryCorticalUnit::get_cortical_ids_array_for_shock_with_parameters(
793 frame_change_handling,
794 percentage_neuron_positioning,
795 group,
796 )[0]
797 }
798 SensoryCorticalUnit::Battery => {
799 SensoryCorticalUnit::get_cortical_ids_array_for_battery_with_parameters(
800 frame_change_handling,
801 percentage_neuron_positioning,
802 group,
803 )[0]
804 }
805 SensoryCorticalUnit::Servo => {
806 SensoryCorticalUnit::get_cortical_ids_array_for_servo_with_parameters(
807 frame_change_handling,
808 percentage_neuron_positioning,
809 group,
810 )[0]
811 }
812 SensoryCorticalUnit::AnalogGPIO => {
813 SensoryCorticalUnit::get_cortical_ids_array_for_analog_g_p_i_o_with_parameters(
814 frame_change_handling,
815 percentage_neuron_positioning,
816 group,
817 )[0]
818 }
819 SensoryCorticalUnit::DigitalGPIO => {
820 SensoryCorticalUnit::get_cortical_ids_array_for_digital_g_p_i_o_with_parameters(group)[0]
821 }
822 SensoryCorticalUnit::MiscData => {
823 SensoryCorticalUnit::get_cortical_ids_array_for_misc_data_with_parameters(
824 frame_change_handling,
825 group,
826 )[0]
827 }
828 SensoryCorticalUnit::TextEnglishInput => {
829 SensoryCorticalUnit::get_cortical_ids_array_for_text_english_input_with_parameters(
830 frame_change_handling,
831 group,
832 )[0]
833 }
834 SensoryCorticalUnit::CountInput => {
835 SensoryCorticalUnit::get_cortical_ids_array_for_count_input_with_parameters(
836 frame_change_handling,
837 percentage_neuron_positioning,
838 group,
839 )[0]
840 }
841 SensoryCorticalUnit::Vision => {
842 SensoryCorticalUnit::get_cortical_ids_array_for_vision_with_parameters(
843 frame_change_handling,
844 group,
845 )[0]
846 }
847 SensoryCorticalUnit::SegmentedVision => {
848 SensoryCorticalUnit::get_cortical_ids_array_for_segmented_vision_with_parameters(
849 frame_change_handling,
850 group,
851 )[0]
852 }
853 SensoryCorticalUnit::Accelerometer => {
854 SensoryCorticalUnit::get_cortical_ids_array_for_accelerometer_with_parameters(
855 frame_change_handling,
856 percentage_neuron_positioning,
857 group,
858 )[0]
859 }
860 SensoryCorticalUnit::Gyroscope => {
861 SensoryCorticalUnit::get_cortical_ids_array_for_gyroscope_with_parameters(
862 frame_change_handling,
863 percentage_neuron_positioning,
864 group,
865 )[0]
866 }
867 }
868 } else {
869 let cortical_area = &vision_cap.target_cortical_area;
870
871 let mut bytes = [b' '; 8];
873 let name_bytes = cortical_area.as_bytes();
874 let copy_len = name_bytes.len().min(8);
875 bytes[..copy_len].copy_from_slice(&name_bytes[..copy_len]);
876 CorticalID::try_from_bytes(&bytes).map_err(|e| {
877 SdkError::Other(format!("Invalid cortical ID '{}': {:?}", cortical_area, e))
878 })?
879 };
880
881 let mut x_coords = Vec::with_capacity(neuron_pairs.len());
883 let mut y_coords = Vec::with_capacity(neuron_pairs.len());
884 let mut z_coords = Vec::with_capacity(neuron_pairs.len());
885 let mut potentials = Vec::with_capacity(neuron_pairs.len());
886
887 for (neuron_id, potential) in neuron_pairs {
888 let neuron_id = neuron_id as u32;
889 x_coords.push(neuron_id % (width as u32));
890 y_coords.push(neuron_id / (width as u32));
891 z_coords.push(0); potentials.push(potential as f32);
893 }
894
895 let _neuron_count = x_coords.len(); let neuron_arrays =
899 NeuronVoxelXYZPArrays::new_from_vectors(x_coords, y_coords, z_coords, potentials)
900 .map_err(|e| SdkError::Other(format!("Failed to create neuron arrays: {:?}", e)))?;
901
902 let cortical_id_log = cortical_id.as_base_64();
904 let mut cortical_mapped = CorticalMappedXYZPNeuronVoxels::new();
905 cortical_mapped.insert(cortical_id, neuron_arrays);
906
907 let mut byte_container = feagi_serialization::FeagiByteContainer::new_empty();
909 byte_container
910 .overwrite_byte_data_with_single_struct_data(&cortical_mapped, 0)
911 .map_err(|e| SdkError::Other(format!("Failed to serialize to container: {:?}", e)))?;
912
913 let buffer = byte_container.get_byte_ref().to_vec();
914
915 let mut socket = socket
917 .lock()
918 .map_err(|e| SdkError::ThreadError(format!("Failed to lock socket: {}", e)))?;
919 let message = ZmqMessage::from(buffer.clone());
920 self.block_on(socket.send(message)).map_err(SdkError::Zmq)?;
921
922 debug!(
923 "[CLIENT] Sent {} bytes XYZP binary to {}",
924 buffer.len(),
925 cortical_id_log
926 );
927 Ok(())
928 }
929
930 pub fn send_sensory_bytes(&self, bytes: Vec<u8>) -> Result<()> {
939 let _ = self.try_send_sensory_bytes(&bytes)?;
940 Ok(())
941 }
942
943 pub fn try_send_sensory_bytes(&self, bytes: &[u8]) -> Result<bool> {
950 if !self.registered {
951 return Err(SdkError::NotRegistered);
952 }
953
954 let socket = self
955 .sensory_socket
956 .as_ref()
957 .ok_or_else(|| SdkError::Other("Sensory socket not initialized".to_string()))?;
958
959 let mut socket = socket
960 .lock()
961 .map_err(|e| SdkError::ThreadError(format!("Failed to lock socket: {}", e)))?;
962 let message = ZmqMessage::from(bytes.to_vec());
963
964 if !self.sensory_connected.load(Ordering::Relaxed) {
965 let timeout_ms = self.config.connection_timeout_ms;
966 let send_result = self.block_on(async {
967 timeout(
968 std::time::Duration::from_millis(timeout_ms),
969 socket.send(message),
970 )
971 .await
972 });
973 match send_result {
974 Ok(Ok(())) => {
975 self.sensory_connected.store(true, Ordering::Relaxed);
976 debug!("[CLIENT] Sensory socket connected (first send)");
977 return Ok(true);
978 }
979 Ok(Err(ZmqError::BufferFull(_))) => return Ok(false),
980 Ok(Err(e)) => {
981 let message = e.to_string();
982 if message.contains("Not connected to peers") {
983 return Ok(false);
984 }
985 return Err(SdkError::Zmq(e));
986 }
987 Err(_) => return Ok(false),
988 }
989 }
990
991 let send_result = self.block_on(async { socket.send(message).now_or_never() });
992 match send_result {
993 Some(Ok(())) => {
994 debug!("[CLIENT] Sent {} bytes sensory (raw)", bytes.len());
995 Ok(true)
996 }
997 None | Some(Err(ZmqError::BufferFull(_))) => {
998 static DROPPED: AtomicU64 = AtomicU64::new(0);
1000 static LAST_LOG_MS: AtomicU64 = AtomicU64::new(0);
1001
1002 let dropped = DROPPED.fetch_add(1, Ordering::Relaxed) + 1;
1003 let now_ms = SystemTime::now()
1004 .duration_since(SystemTime::UNIX_EPOCH)
1005 .unwrap_or_default()
1006 .as_millis() as u64;
1007
1008 let last_ms = LAST_LOG_MS.load(Ordering::Relaxed);
1009 if now_ms.saturating_sub(last_ms) >= 5_000
1011 && LAST_LOG_MS
1012 .compare_exchange(last_ms, now_ms, Ordering::Relaxed, Ordering::Relaxed)
1013 .is_ok()
1014 {
1015 warn!(
1016 "[CLIENT] Sensory backpressure: dropped_messages={} last_payload_bytes={}",
1017 dropped,
1018 bytes.len()
1019 );
1020 }
1021
1022 Ok(false)
1023 }
1024 Some(Err(e)) => {
1025 let message = e.to_string();
1026 if message.contains("Not connected to peers") {
1027 static DROPPED: AtomicU64 = AtomicU64::new(0);
1028 static LAST_LOG_MS: AtomicU64 = AtomicU64::new(0);
1029
1030 let dropped = DROPPED.fetch_add(1, Ordering::Relaxed) + 1;
1031 let now_ms = SystemTime::now()
1032 .duration_since(SystemTime::UNIX_EPOCH)
1033 .unwrap_or_default()
1034 .as_millis() as u64;
1035
1036 let last_ms = LAST_LOG_MS.load(Ordering::Relaxed);
1037 if now_ms.saturating_sub(last_ms) >= 5_000
1038 && LAST_LOG_MS
1039 .compare_exchange(last_ms, now_ms, Ordering::Relaxed, Ordering::Relaxed)
1040 .is_ok()
1041 {
1042 warn!(
1043 "[CLIENT] Sensory not connected: dropped_messages={} last_payload_bytes={}",
1044 dropped,
1045 bytes.len()
1046 );
1047 }
1048
1049 return Ok(false);
1050 }
1051
1052 Err(SdkError::Zmq(e))
1053 }
1054 }
1055 }
1056
1057 pub fn receive_motor_data(
1074 &self,
1075 ) -> Result<Option<feagi_structures::neuron_voxels::xyzp::CorticalMappedXYZPNeuronVoxels>> {
1076 use feagi_structures::neuron_voxels::xyzp::CorticalMappedXYZPNeuronVoxels;
1077
1078 if !self.registered {
1079 return Err(SdkError::NotRegistered);
1080 }
1081
1082 let socket = self.motor_socket.as_ref().ok_or_else(|| {
1083 error!("[CLIENT] receive_motor_data() called but motor_socket is None");
1084 SdkError::Other("Motor socket not initialized (not a motor agent?)".to_string())
1085 })?;
1086
1087 let mut socket = socket
1088 .lock()
1089 .map_err(|e| SdkError::ThreadError(format!("Failed to lock socket: {}", e)))?;
1090 let recv_result = self.block_on(async { socket.recv().now_or_never() });
1091 let message = match recv_result {
1092 None => return Ok(None),
1093 Some(Ok(message)) => message,
1094 Some(Err(e)) => return Err(SdkError::Zmq(e)),
1095 };
1096
1097 let mut frames = message.into_vec();
1101 if frames.is_empty() {
1102 return Ok(None);
1103 }
1104 let (_topic_opt, data) = if frames.len() == 2 {
1105 let topic = frames.remove(0).to_vec();
1106 trace!(
1107 "[CLIENT] Motor multipart topic: '{}'",
1108 String::from_utf8_lossy(&topic)
1109 );
1110 let data = frames.remove(0).to_vec();
1111 trace!("[CLIENT] Received motor data frame: {} bytes", data.len());
1112 (Some(topic), data)
1113 } else if frames.len() == 1 {
1114 (None, frames.remove(0).to_vec())
1115 } else {
1116 return Err(SdkError::Other(
1117 "Unexpected multipart motor payload".to_string(),
1118 ));
1119 };
1120
1121 let mut byte_container = feagi_serialization::FeagiByteContainer::new_empty();
1130 let mut data_vec = data.to_vec();
1131
1132 byte_container
1134 .try_write_data_to_container_and_verify(&mut |bytes| {
1135 std::mem::swap(bytes, &mut data_vec);
1136 Ok(())
1137 })
1138 .map_err(|e| SdkError::Other(format!("Failed to load motor data bytes: {:?}", e)))?;
1139
1140 let num_structures = byte_container
1142 .try_get_number_contained_structures()
1143 .map_err(|e| SdkError::Other(format!("Failed to get structure count: {:?}", e)))?;
1144
1145 if num_structures == 0 {
1146 return Ok(None);
1147 }
1148
1149 let boxed_struct = byte_container
1151 .try_create_new_struct_from_index(0)
1152 .map_err(|e| SdkError::Other(format!("Failed to extract motor structure: {:?}", e)))?;
1153
1154 let motor_data = boxed_struct
1156 .as_any()
1157 .downcast_ref::<CorticalMappedXYZPNeuronVoxels>()
1158 .ok_or_else(|| {
1159 SdkError::Other("Motor data is not CorticalMappedXYZPNeuronVoxels".to_string())
1160 })?
1161 .clone();
1162
1163 debug!(
1164 "[CLIENT] ✓ Received motor data ({} bytes, {} areas)",
1165 data.len(),
1166 motor_data.len()
1167 );
1168 Ok(Some(motor_data))
1169 }
1170
1171 pub fn receive_visualization_data(&self) -> Result<Option<Vec<u8>>> {
1183 if !self.registered {
1184 return Err(SdkError::NotRegistered);
1185 }
1186
1187 let socket = self.viz_socket.as_ref().ok_or_else(|| {
1188 SdkError::Other(
1189 "Visualization socket not initialized (not a visualization/infrastructure agent?)"
1190 .to_string(),
1191 )
1192 })?;
1193
1194 let mut socket = socket
1195 .lock()
1196 .map_err(|e| SdkError::ThreadError(format!("Failed to lock socket: {}", e)))?;
1197 let recv_result = self.block_on(async { socket.recv().now_or_never() });
1198 match recv_result {
1199 None => Ok(None),
1200 Some(Ok(message)) => {
1201 let mut frames = message.into_vec();
1202 if frames.is_empty() {
1203 return Ok(None);
1204 }
1205 let data = frames.pop().unwrap().to_vec();
1206 debug!(
1207 "[CLIENT] ✓ Received visualization data ({} bytes)",
1208 data.len()
1209 );
1210 Ok(Some(data))
1211 }
1212 Some(Err(e)) => Err(SdkError::Zmq(e)),
1213 }
1214 }
1215
1216 pub fn control_request(
1233 &self,
1234 method: &str,
1235 route: &str,
1236 data: Option<serde_json::Value>,
1237 ) -> Result<serde_json::Value> {
1238 if !self.registered {
1239 return Err(SdkError::NotRegistered);
1240 }
1241
1242 let socket = self.control_socket.as_ref().ok_or_else(|| {
1243 SdkError::Other(
1244 "Control socket not initialized (not an infrastructure agent?)".to_string(),
1245 )
1246 })?;
1247
1248 let mut request = serde_json::json!({
1250 "method": method,
1251 "route": route,
1252 "headers": {"content-type": "application/json"},
1253 });
1254
1255 if let Some(body) = data {
1256 request["body"] = body;
1257 }
1258
1259 let response = self.send_request(socket, &request)?;
1260 debug!("[CLIENT] ✓ Control request {} {} completed", method, route);
1261 Ok(response)
1262 }
1263
1264 pub fn is_registered(&self) -> bool {
1266 self.registered
1267 }
1268
1269 pub fn agent_id(&self) -> &str {
1271 &self.config.agent_id
1272 }
1273}
1274
1275impl Drop for AgentClient {
1276 fn drop(&mut self) {
1277 debug!("[CLIENT] Dropping AgentClient: {}", self.config.agent_id);
1278
1279 if let Some(mut heartbeat) = self.heartbeat.take() {
1281 debug!("[CLIENT] Stopping heartbeat service before cleanup");
1282 heartbeat.stop();
1283 debug!("[CLIENT] Heartbeat service stopped");
1284 }
1285
1286 if self.registered {
1288 debug!("[CLIENT] Deregistering agent: {}", self.config.agent_id);
1289 if let Err(e) = self.deregister() {
1290 warn!("[CLIENT] Deregistration failed during drop: {}", e);
1291 }
1293 }
1294
1295 debug!(
1297 "[CLIENT] AgentClient dropped cleanly: {}",
1298 self.config.agent_id
1299 );
1300 }
1301}
1302
1303#[cfg(test)]
1304mod tests {
1305 use super::*;
1306 use feagi_io::AgentType;
1307
1308 #[test]
1309 fn test_client_creation() {
1310 let config = AgentConfig::new("test_agent", AgentType::Sensory)
1311 .with_vision_capability("camera", (640, 480), 3, "i_vision")
1312 .with_registration_endpoint("tcp://localhost:8000")
1313 .with_sensory_endpoint("tcp://localhost:5558");
1314
1315 let client = AgentClient::new(config);
1316 assert!(client.is_ok());
1317
1318 let client = client.unwrap();
1319 assert!(!client.is_registered());
1320 assert_eq!(client.agent_id(), "test_agent");
1321 }
1322
1323 }