1use crate::core::config::AgentConfig;
7use crate::core::error::{Result, SdkError};
8use crate::core::heartbeat::HeartbeatService;
9use crate::core::reconnect::{retry_with_backoff, ReconnectionStrategy};
10use feagi_io::AgentType;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Arc, Mutex};
13use std::time::SystemTime;
14use tracing::{debug, error, info, trace, warn};
15
16pub struct AgentClient {
42 config: AgentConfig,
44
45 context: zmq::Context,
47
48 registration_socket: Option<Arc<Mutex<zmq::Socket>>>,
50
51 sensory_socket: Option<zmq::Socket>,
53
54 motor_socket: Option<zmq::Socket>,
56
57 viz_socket: Option<zmq::Socket>,
59
60 control_socket: Option<zmq::Socket>,
62
63 heartbeat: Option<HeartbeatService>,
65
66 registered: bool,
68
69 last_registration_body: Option<serde_json::Value>,
77}
78
79impl AgentClient {
80 pub fn new(config: AgentConfig) -> Result<Self> {
85 config.validate()?;
87
88 let context = zmq::Context::new();
89
90 Ok(Self {
91 config,
92 context,
93 registration_socket: None,
94 sensory_socket: None,
95 motor_socket: None,
96 viz_socket: None,
97 control_socket: None,
98 heartbeat: None,
99 registered: false,
100 last_registration_body: None,
101 })
102 }
103
104 pub fn registration_body_json(&self) -> Option<&serde_json::Value> {
108 self.last_registration_body.as_ref()
109 }
110
111 pub fn connect(&mut self) -> Result<()> {
122 if self.registered {
123 return Err(SdkError::AlreadyConnected);
124 }
125
126 info!(
127 "[CLIENT] Connecting to FEAGI: {}",
128 self.config.registration_endpoint
129 );
130
131 let mut socket_strategy = ReconnectionStrategy::new(
133 self.config.retry_backoff_ms,
134 self.config.registration_retries,
135 );
136 retry_with_backoff(
137 || self.create_sockets(),
138 &mut socket_strategy,
139 "Socket creation",
140 )?;
141
142 let mut reg_strategy = ReconnectionStrategy::new(
144 self.config.retry_backoff_ms,
145 self.config.registration_retries,
146 );
147 retry_with_backoff(|| self.register(), &mut reg_strategy, "Registration")?;
148
149 if self.config.heartbeat_interval > 0.0 {
152 debug!("[CLIENT] Starting heartbeat service (post-registration)");
153 self.start_heartbeat()?;
154 } else {
155 debug!("[CLIENT] Heartbeat disabled (interval = 0)");
156 }
157
158 info!(
159 "[CLIENT] ✓ Connected and registered as: {}",
160 self.config.agent_id
161 );
162 Ok(())
163 }
164
165 fn create_sockets(&mut self) -> Result<()> {
167 let reg_socket = self.context.socket(zmq::REQ)?;
169 reg_socket.set_rcvtimeo(self.config.connection_timeout_ms as i32)?;
170 reg_socket.set_sndtimeo(self.config.connection_timeout_ms as i32)?;
171 let _ = reg_socket.set_req_relaxed(true);
176 reg_socket.connect(&self.config.registration_endpoint)?;
177 self.registration_socket = Some(Arc::new(Mutex::new(reg_socket)));
178
179 let sensory_socket = self.context.socket(zmq::PUSH)?;
181 sensory_socket.set_sndhwm(self.config.sensory_send_hwm)?;
182 sensory_socket.set_linger(self.config.sensory_linger_ms)?;
183 sensory_socket.set_immediate(self.config.sensory_immediate)?;
184 sensory_socket.connect(&self.config.sensory_endpoint)?;
185 self.sensory_socket = Some(sensory_socket);
186
187 if matches!(self.config.agent_type, AgentType::Motor | AgentType::Both) {
189 info!(
190 "[SDK-CONNECT] 🎮 Initializing motor socket for agent '{}' (type: {:?})",
191 self.config.agent_id, self.config.agent_type
192 );
193 info!(
194 "[SDK-CONNECT] 🎮 Motor endpoint: {}",
195 self.config.motor_endpoint
196 );
197
198 let motor_socket = self.context.socket(zmq::SUB)?;
199 motor_socket.connect(&self.config.motor_endpoint)?;
200 info!("[SDK-CONNECT] ✅ Motor socket connected");
201
202 info!("[SDK-CONNECT] 🎮 Subscribing to all motor topics");
212 motor_socket.set_subscribe(b"")?;
213 info!("[SDK-CONNECT] ✅ Motor subscription set (all topics)");
214
215 self.motor_socket = Some(motor_socket);
216 info!("[SDK-CONNECT] ✅ Motor socket initialized successfully");
217 } else {
218 info!(
219 "[SDK-CONNECT] ⚠️ Motor socket NOT initialized (agent type: {:?})",
220 self.config.agent_type
221 );
222 }
223
224 if matches!(
226 self.config.agent_type,
227 AgentType::Visualization | AgentType::Infrastructure
228 ) {
229 let viz_socket = self.context.socket(zmq::SUB)?;
230 viz_socket.connect(&self.config.visualization_endpoint)?;
231
232 viz_socket.set_subscribe(b"")?;
234 self.viz_socket = Some(viz_socket);
235 debug!("[CLIENT] ✓ Visualization socket created");
236 }
237
238 if matches!(self.config.agent_type, AgentType::Infrastructure) {
240 let control_socket = self.context.socket(zmq::REQ)?;
241 control_socket.set_rcvtimeo(self.config.connection_timeout_ms as i32)?;
242 control_socket.set_sndtimeo(self.config.connection_timeout_ms as i32)?;
243 control_socket.connect(&self.config.control_endpoint)?;
244 self.control_socket = Some(control_socket);
245 debug!("[CLIENT] ✓ Control/API socket created");
246 }
247
248 debug!("[CLIENT] ✓ ZMQ sockets created");
249 Ok(())
250 }
251
252 fn register(&mut self) -> Result<()> {
254 let registration_msg = serde_json::json!({
255 "method": "POST",
256 "path": "/v1/agent/register",
257 "body": {
258 "agent_id": self.config.agent_id,
259 "agent_type": match self.config.agent_type {
260 AgentType::Sensory => "sensory",
261 AgentType::Motor => "motor",
262 AgentType::Both => "both",
263 AgentType::Visualization => "visualization",
264 AgentType::Infrastructure => "infrastructure",
265 },
266 "capabilities": self.config.capabilities,
267 }
268 });
269
270 let socket = self
271 .registration_socket
272 .as_ref()
273 .ok_or_else(|| SdkError::Other("Registration socket not initialized".to_string()))?;
274
275 let response = {
277 let socket = socket
278 .lock()
279 .map_err(|e| SdkError::ThreadError(format!("Failed to lock socket: {}", e)))?;
280
281 debug!(
282 "[CLIENT] Sending registration request for: {}",
283 self.config.agent_id
284 );
285 socket.send(registration_msg.to_string().as_bytes(), 0)?;
286
287 let response_bytes = socket.recv_bytes(0)?;
289 serde_json::from_slice::<serde_json::Value>(&response_bytes)?
290 }; let status_code = response
294 .get("status")
295 .and_then(|s| s.as_u64())
296 .unwrap_or(500);
297 if status_code == 200 {
298 self.registered = true;
299 let empty_body = serde_json::json!({});
301 let body = response.get("body").unwrap_or(&empty_body);
302 self.last_registration_body = Some(body.clone());
303 info!("[CLIENT] ✓ Registration successful: {:?}", response);
304 Ok(())
305 } else {
306 let empty_body = serde_json::json!({});
307 let body = response.get("body").unwrap_or(&empty_body);
308 let message = body
309 .get("error")
310 .and_then(|m| m.as_str())
311 .unwrap_or("Unknown error");
312 self.last_registration_body = None;
314
315 if message.contains("already registered") {
317 warn!("[CLIENT] ⚠ Agent already registered - attempting deregistration and retry");
318 self.deregister()?;
319
320 info!("[CLIENT] Retrying registration after deregistration...");
322 std::thread::sleep(std::time::Duration::from_millis(100)); self.register_with_retry_once()
326 } else {
327 error!("[CLIENT] ✗ Registration failed: {}", message);
328 Err(SdkError::RegistrationFailed(message.to_string()))
329 }
330 }
331 }
332
333 fn register_with_retry_once(&mut self) -> Result<()> {
335 let registration_msg = serde_json::json!({
336 "method": "POST",
337 "path": "/v1/agent/register",
338 "body": {
339 "agent_id": self.config.agent_id,
340 "agent_type": match self.config.agent_type {
341 AgentType::Sensory => "sensory",
342 AgentType::Motor => "motor",
343 AgentType::Both => "both",
344 AgentType::Visualization => "visualization",
345 AgentType::Infrastructure => "infrastructure",
346 },
347 "capabilities": self.config.capabilities,
348 }
349 });
350
351 let socket = self
352 .registration_socket
353 .as_ref()
354 .ok_or_else(|| SdkError::Other("Registration socket not initialized".to_string()))?;
355
356 let response = {
358 let socket = socket
359 .lock()
360 .map_err(|e| SdkError::ThreadError(format!("Failed to lock socket: {}", e)))?;
361
362 debug!(
363 "[CLIENT] Sending registration request (retry) for: {}",
364 self.config.agent_id
365 );
366 socket.send(registration_msg.to_string().as_bytes(), 0)?;
367
368 let response_bytes = socket.recv_bytes(0)?;
370 serde_json::from_slice::<serde_json::Value>(&response_bytes)?
371 }; let status_code = response
375 .get("status")
376 .and_then(|s| s.as_u64())
377 .unwrap_or(500);
378 if status_code == 200 {
379 self.registered = true;
380 let empty_body = serde_json::json!({});
382 let body = response.get("body").unwrap_or(&empty_body);
383 self.last_registration_body = Some(body.clone());
384 info!(
385 "[CLIENT] ✓ Registration successful (after retry): {:?}",
386 response
387 );
388 Ok(())
389 } else {
390 let empty_body = serde_json::json!({});
391 let body = response.get("body").unwrap_or(&empty_body);
392 let message = body
393 .get("error")
394 .and_then(|m| m.as_str())
395 .unwrap_or("Unknown error");
396 self.last_registration_body = None;
397 error!("[CLIENT] ✗ Registration retry failed: {}", message);
398 Err(SdkError::RegistrationFailed(message.to_string()))
399 }
400 }
401
402 fn deregister(&mut self) -> Result<()> {
404 if !self.registered && self.registration_socket.is_none() {
405 return Ok(()); }
407
408 info!("[CLIENT] Deregistering agent: {}", self.config.agent_id);
409
410 let deregistration_msg = serde_json::json!({
411 "method": "DELETE",
412 "path": "/v1/agent/deregister",
413 "body": {
414 "agent_id": self.config.agent_id,
415 }
416 });
417
418 if let Some(socket) = &self.registration_socket {
419 let socket = socket
420 .lock()
421 .map_err(|e| SdkError::ThreadError(format!("Failed to lock socket: {}", e)))?;
422
423 if let Err(e) = socket.send(deregistration_msg.to_string().as_bytes(), 0) {
425 warn!("[CLIENT] ⚠ Failed to send deregistration: {}", e);
426 return Ok(()); }
428
429 match socket.recv_bytes(0) {
431 Ok(response_bytes) => {
432 let response: serde_json::Value = serde_json::from_slice(&response_bytes)?;
433 if response.get("status").and_then(|s| s.as_str()) == Some("success") {
434 info!("[CLIENT] ✓ Deregistration successful");
435 } else {
436 warn!("[CLIENT] ⚠ Deregistration returned: {:?}", response);
437 }
438 }
439 Err(e) => {
440 warn!("[CLIENT] ⚠ Deregistration timeout/error: {}", e);
441 }
442 }
443 }
444
445 self.registered = false;
446 Ok(())
447 }
448
449 fn start_heartbeat(&mut self) -> Result<()> {
451 if self.heartbeat.is_some() {
452 return Ok(());
453 }
454
455 let socket = self
456 .registration_socket
457 .as_ref()
458 .ok_or_else(|| SdkError::Other("Registration socket not initialized".to_string()))?;
459
460 let agent_type = match self.config.agent_type {
461 AgentType::Sensory => "sensory",
462 AgentType::Motor => "motor",
463 AgentType::Both => "both",
464 AgentType::Visualization => "visualization",
465 AgentType::Infrastructure => "infrastructure",
466 }
467 .to_string();
468 let capabilities = serde_json::to_value(&self.config.capabilities)
469 .map_err(|e| SdkError::Other(format!("Failed to serialize capabilities: {e}")))?;
470
471 let reconnect_spec = crate::core::heartbeat::ReconnectSpec {
472 agent_id: self.config.agent_id.clone(),
473 agent_type,
474 capabilities,
475 registration_retries: self.config.registration_retries,
476 retry_backoff_ms: self.config.retry_backoff_ms,
477 };
478
479 let mut heartbeat = HeartbeatService::new(
480 self.config.agent_id.clone(),
481 Arc::clone(socket),
482 self.config.heartbeat_interval,
483 )
484 .with_reconnect_spec(reconnect_spec);
485
486 heartbeat.start()?;
487 self.heartbeat = Some(heartbeat);
488
489 debug!(
490 "[CLIENT] ✓ Heartbeat service started (interval: {}s)",
491 self.config.heartbeat_interval
492 );
493 Ok(())
494 }
495
496 pub fn send_sensory_data(&self, neuron_pairs: Vec<(i32, f64)>) -> Result<()> {
510 if !self.registered {
511 return Err(SdkError::NotRegistered);
512 }
513
514 let socket = self
515 .sensory_socket
516 .as_ref()
517 .ok_or_else(|| SdkError::Other("Sensory socket not initialized".to_string()))?;
518
519 use feagi_structures::genomic::cortical_area::CorticalID;
522 use feagi_structures::neuron_voxels::xyzp::{
523 CorticalMappedXYZPNeuronVoxels, NeuronVoxelXYZPArrays,
524 };
525
526 let vision_cap = self
528 .config
529 .capabilities
530 .vision
531 .as_ref()
532 .ok_or_else(|| SdkError::Other("No vision capability configured".to_string()))?;
533
534 let (width, _height) = vision_cap.dimensions;
535
536 let cortical_id = if let (Some(unit), Some(group_index)) =
538 (vision_cap.unit, vision_cap.group)
539 {
540 use feagi_structures::genomic::cortical_area::io_cortical_area_configuration_flag::PercentageNeuronPositioning;
541 use feagi_structures::genomic::SensoryCorticalUnit;
542
543 let group: feagi_structures::genomic::cortical_area::descriptors::CorticalUnitIndex =
544 group_index.into();
545 let frame_change_handling = feagi_structures::genomic::cortical_area::io_cortical_area_configuration_flag::FrameChangeHandling::Absolute;
546 let percentage_neuron_positioning = PercentageNeuronPositioning::Linear;
547
548 let sensory_unit = match unit {
549 feagi_io::SensoryUnit::Infrared => SensoryCorticalUnit::Infrared,
550 feagi_io::SensoryUnit::Proximity => SensoryCorticalUnit::Proximity,
551 feagi_io::SensoryUnit::Shock => SensoryCorticalUnit::Shock,
552 feagi_io::SensoryUnit::Battery => SensoryCorticalUnit::Battery,
553 feagi_io::SensoryUnit::Servo => SensoryCorticalUnit::Servo,
554 feagi_io::SensoryUnit::AnalogGpio => SensoryCorticalUnit::AnalogGPIO,
555 feagi_io::SensoryUnit::DigitalGpio => SensoryCorticalUnit::DigitalGPIO,
556 feagi_io::SensoryUnit::MiscData => SensoryCorticalUnit::MiscData,
557 feagi_io::SensoryUnit::TextEnglishInput => SensoryCorticalUnit::TextEnglishInput,
558 feagi_io::SensoryUnit::CountInput => SensoryCorticalUnit::CountInput,
559 feagi_io::SensoryUnit::Vision => SensoryCorticalUnit::Vision,
560 feagi_io::SensoryUnit::SegmentedVision => SensoryCorticalUnit::SegmentedVision,
561 feagi_io::SensoryUnit::Accelerometer => SensoryCorticalUnit::Accelerometer,
562 feagi_io::SensoryUnit::Gyroscope => SensoryCorticalUnit::Gyroscope,
563 };
564
565 match sensory_unit {
568 SensoryCorticalUnit::Infrared => {
569 SensoryCorticalUnit::get_cortical_ids_array_for_infrared_with_parameters(
570 frame_change_handling,
571 percentage_neuron_positioning,
572 group,
573 )[0]
574 }
575 SensoryCorticalUnit::Proximity => {
576 SensoryCorticalUnit::get_cortical_ids_array_for_proximity_with_parameters(
577 frame_change_handling,
578 percentage_neuron_positioning,
579 group,
580 )[0]
581 }
582 SensoryCorticalUnit::Shock => {
583 SensoryCorticalUnit::get_cortical_ids_array_for_shock_with_parameters(
584 frame_change_handling,
585 percentage_neuron_positioning,
586 group,
587 )[0]
588 }
589 SensoryCorticalUnit::Battery => {
590 SensoryCorticalUnit::get_cortical_ids_array_for_battery_with_parameters(
591 frame_change_handling,
592 percentage_neuron_positioning,
593 group,
594 )[0]
595 }
596 SensoryCorticalUnit::Servo => {
597 SensoryCorticalUnit::get_cortical_ids_array_for_servo_with_parameters(
598 frame_change_handling,
599 percentage_neuron_positioning,
600 group,
601 )[0]
602 }
603 SensoryCorticalUnit::AnalogGPIO => {
604 SensoryCorticalUnit::get_cortical_ids_array_for_analog_g_p_i_o_with_parameters(
605 frame_change_handling,
606 percentage_neuron_positioning,
607 group,
608 )[0]
609 }
610 SensoryCorticalUnit::DigitalGPIO => {
611 SensoryCorticalUnit::get_cortical_ids_array_for_digital_g_p_i_o_with_parameters(group)[0]
612 }
613 SensoryCorticalUnit::MiscData => {
614 SensoryCorticalUnit::get_cortical_ids_array_for_misc_data_with_parameters(
615 frame_change_handling,
616 group,
617 )[0]
618 }
619 SensoryCorticalUnit::TextEnglishInput => {
620 SensoryCorticalUnit::get_cortical_ids_array_for_text_english_input_with_parameters(
621 frame_change_handling,
622 group,
623 )[0]
624 }
625 SensoryCorticalUnit::CountInput => {
626 SensoryCorticalUnit::get_cortical_ids_array_for_count_input_with_parameters(
627 frame_change_handling,
628 percentage_neuron_positioning,
629 group,
630 )[0]
631 }
632 SensoryCorticalUnit::Vision => {
633 SensoryCorticalUnit::get_cortical_ids_array_for_vision_with_parameters(
634 frame_change_handling,
635 group,
636 )[0]
637 }
638 SensoryCorticalUnit::SegmentedVision => {
639 SensoryCorticalUnit::get_cortical_ids_array_for_segmented_vision_with_parameters(
640 frame_change_handling,
641 group,
642 )[0]
643 }
644 SensoryCorticalUnit::Accelerometer => {
645 SensoryCorticalUnit::get_cortical_ids_array_for_accelerometer_with_parameters(
646 frame_change_handling,
647 percentage_neuron_positioning,
648 group,
649 )[0]
650 }
651 SensoryCorticalUnit::Gyroscope => {
652 SensoryCorticalUnit::get_cortical_ids_array_for_gyroscope_with_parameters(
653 frame_change_handling,
654 percentage_neuron_positioning,
655 group,
656 )[0]
657 }
658 }
659 } else {
660 let cortical_area = &vision_cap.target_cortical_area;
661
662 let mut bytes = [b' '; 8];
664 let name_bytes = cortical_area.as_bytes();
665 let copy_len = name_bytes.len().min(8);
666 bytes[..copy_len].copy_from_slice(&name_bytes[..copy_len]);
667 CorticalID::try_from_bytes(&bytes).map_err(|e| {
668 SdkError::Other(format!("Invalid cortical ID '{}': {:?}", cortical_area, e))
669 })?
670 };
671
672 let mut x_coords = Vec::with_capacity(neuron_pairs.len());
674 let mut y_coords = Vec::with_capacity(neuron_pairs.len());
675 let mut z_coords = Vec::with_capacity(neuron_pairs.len());
676 let mut potentials = Vec::with_capacity(neuron_pairs.len());
677
678 for (neuron_id, potential) in neuron_pairs {
679 let neuron_id = neuron_id as u32;
680 x_coords.push(neuron_id % (width as u32));
681 y_coords.push(neuron_id / (width as u32));
682 z_coords.push(0); potentials.push(potential as f32);
684 }
685
686 let _neuron_count = x_coords.len(); let neuron_arrays =
690 NeuronVoxelXYZPArrays::new_from_vectors(x_coords, y_coords, z_coords, potentials)
691 .map_err(|e| SdkError::Other(format!("Failed to create neuron arrays: {:?}", e)))?;
692
693 let cortical_id_log = cortical_id.as_base_64();
695 let mut cortical_mapped = CorticalMappedXYZPNeuronVoxels::new();
696 cortical_mapped.insert(cortical_id, neuron_arrays);
697
698 let mut byte_container = feagi_serialization::FeagiByteContainer::new_empty();
700 byte_container
701 .overwrite_byte_data_with_single_struct_data(&cortical_mapped, 0)
702 .map_err(|e| SdkError::Other(format!("Failed to serialize to container: {:?}", e)))?;
703
704 let buffer = byte_container.get_byte_ref().to_vec();
705
706 socket.send(&buffer, 0)?;
708
709 debug!(
710 "[CLIENT] Sent {} bytes XYZP binary to {}",
711 buffer.len(),
712 cortical_id_log
713 );
714 Ok(())
715 }
716
717 pub fn send_sensory_bytes(&self, bytes: Vec<u8>) -> Result<()> {
726 let _ = self.try_send_sensory_bytes(&bytes)?;
727 Ok(())
728 }
729
730 pub fn try_send_sensory_bytes(&self, bytes: &[u8]) -> Result<bool> {
737 if !self.registered {
738 return Err(SdkError::NotRegistered);
739 }
740
741 let socket = self
742 .sensory_socket
743 .as_ref()
744 .ok_or_else(|| SdkError::Other("Sensory socket not initialized".to_string()))?;
745
746 match socket.send(bytes, zmq::DONTWAIT) {
747 Ok(()) => {
748 debug!("[CLIENT] Sent {} bytes sensory (raw)", bytes.len());
749 Ok(true)
750 }
751 Err(zmq::Error::EAGAIN) => {
752 static DROPPED: AtomicU64 = AtomicU64::new(0);
754 static LAST_LOG_MS: AtomicU64 = AtomicU64::new(0);
755
756 let dropped = DROPPED.fetch_add(1, Ordering::Relaxed) + 1;
757 let now_ms = SystemTime::now()
758 .duration_since(SystemTime::UNIX_EPOCH)
759 .unwrap_or_default()
760 .as_millis() as u64;
761
762 let last_ms = LAST_LOG_MS.load(Ordering::Relaxed);
763 if now_ms.saturating_sub(last_ms) >= 5_000
765 && LAST_LOG_MS
766 .compare_exchange(last_ms, now_ms, Ordering::Relaxed, Ordering::Relaxed)
767 .is_ok()
768 {
769 warn!(
770 "[CLIENT] Sensory backpressure: dropped_messages={} last_payload_bytes={}",
771 dropped,
772 bytes.len()
773 );
774 }
775
776 Ok(false)
777 }
778 Err(e) => Err(SdkError::Zmq(e)),
779 }
780 }
781
782 pub fn receive_motor_data(
799 &self,
800 ) -> Result<Option<feagi_structures::neuron_voxels::xyzp::CorticalMappedXYZPNeuronVoxels>> {
801 use feagi_structures::neuron_voxels::xyzp::CorticalMappedXYZPNeuronVoxels;
802
803 if !self.registered {
804 return Err(SdkError::NotRegistered);
805 }
806
807 let socket = self.motor_socket.as_ref().ok_or_else(|| {
808 error!("[CLIENT] receive_motor_data() called but motor_socket is None");
809 SdkError::Other("Motor socket not initialized (not a motor agent?)".to_string())
810 })?;
811
812 match socket.recv_bytes(zmq::DONTWAIT) {
816 Ok(first_frame) => {
817 trace!(
818 "[CLIENT] Received first motor frame: {} bytes",
819 first_frame.len()
820 );
821
822 let (_topic_opt, data) = if socket.get_rcvmore().map_err(SdkError::Zmq)? {
824 let topic = first_frame;
826 trace!(
827 "[CLIENT] Motor multipart topic: '{}'",
828 String::from_utf8_lossy(&topic)
829 );
830 trace!("[CLIENT] Receiving second frame (motor data)");
831 let data = socket.recv_bytes(0).map_err(|e| {
832 error!("[CLIENT] Failed to receive second frame: {}", e);
833 SdkError::Zmq(e)
834 })?;
835 trace!("[CLIENT] Received motor data frame: {} bytes", data.len());
836 (Some(topic), data)
837 } else {
838 (None, first_frame)
840 };
841
842 let mut byte_container = feagi_serialization::FeagiByteContainer::new_empty();
851 let mut data_vec = data.to_vec();
852
853 byte_container
855 .try_write_data_to_container_and_verify(&mut |bytes| {
856 std::mem::swap(bytes, &mut data_vec);
857 Ok(())
858 })
859 .map_err(|e| {
860 SdkError::Other(format!("Failed to load motor data bytes: {:?}", e))
861 })?;
862
863 let num_structures = byte_container
865 .try_get_number_contained_structures()
866 .map_err(|e| {
867 SdkError::Other(format!("Failed to get structure count: {:?}", e))
868 })?;
869
870 if num_structures == 0 {
871 return Ok(None);
872 }
873
874 let boxed_struct =
876 byte_container
877 .try_create_new_struct_from_index(0)
878 .map_err(|e| {
879 SdkError::Other(format!("Failed to extract motor structure: {:?}", e))
880 })?;
881
882 let motor_data = boxed_struct
884 .as_any()
885 .downcast_ref::<CorticalMappedXYZPNeuronVoxels>()
886 .ok_or_else(|| {
887 SdkError::Other(
888 "Motor data is not CorticalMappedXYZPNeuronVoxels".to_string(),
889 )
890 })?
891 .clone();
892
893 debug!(
894 "[CLIENT] ✓ Received motor data ({} bytes, {} areas)",
895 data.len(),
896 motor_data.len()
897 );
898 Ok(Some(motor_data))
899 }
900 Err(zmq::Error::EAGAIN) => {
901 Ok(None)
903 }
904 Err(e) => {
905 error!("[CLIENT] ❌ ZMQ error on motor receive: {}", e);
906 Err(SdkError::Zmq(e))
907 }
908 }
909 }
910
911 pub fn receive_visualization_data(&self) -> Result<Option<Vec<u8>>> {
923 if !self.registered {
924 return Err(SdkError::NotRegistered);
925 }
926
927 let socket = self.viz_socket.as_ref().ok_or_else(|| {
928 SdkError::Other(
929 "Visualization socket not initialized (not a visualization/infrastructure agent?)"
930 .to_string(),
931 )
932 })?;
933
934 match socket.recv_bytes(zmq::DONTWAIT) {
936 Ok(data) => {
937 debug!(
938 "[CLIENT] ✓ Received visualization data ({} bytes)",
939 data.len()
940 );
941 Ok(Some(data))
942 }
943 Err(zmq::Error::EAGAIN) => Ok(None), Err(e) => Err(SdkError::Zmq(e)),
945 }
946 }
947
948 pub fn control_request(
965 &self,
966 method: &str,
967 route: &str,
968 data: Option<serde_json::Value>,
969 ) -> Result<serde_json::Value> {
970 if !self.registered {
971 return Err(SdkError::NotRegistered);
972 }
973
974 let socket = self.control_socket.as_ref().ok_or_else(|| {
975 SdkError::Other(
976 "Control socket not initialized (not an infrastructure agent?)".to_string(),
977 )
978 })?;
979
980 let mut request = serde_json::json!({
982 "method": method,
983 "route": route,
984 "headers": {"content-type": "application/json"},
985 });
986
987 if let Some(body) = data {
988 request["body"] = body;
989 }
990
991 socket.send(request.to_string().as_bytes(), 0)?;
993
994 let response_bytes = socket.recv_bytes(0)?;
996 let response: serde_json::Value = serde_json::from_slice(&response_bytes)?;
997
998 debug!("[CLIENT] ✓ Control request {} {} completed", method, route);
999 Ok(response)
1000 }
1001
1002 pub fn is_registered(&self) -> bool {
1004 self.registered
1005 }
1006
1007 pub fn agent_id(&self) -> &str {
1009 &self.config.agent_id
1010 }
1011}
1012
1013impl Drop for AgentClient {
1014 fn drop(&mut self) {
1015 debug!("[CLIENT] Dropping AgentClient: {}", self.config.agent_id);
1016
1017 if let Some(mut heartbeat) = self.heartbeat.take() {
1019 debug!("[CLIENT] Stopping heartbeat service before cleanup");
1020 heartbeat.stop();
1021 debug!("[CLIENT] Heartbeat service stopped");
1022 }
1023
1024 if self.registered {
1026 debug!("[CLIENT] Deregistering agent: {}", self.config.agent_id);
1027 if let Err(e) = self.deregister() {
1028 warn!("[CLIENT] Deregistration failed during drop: {}", e);
1029 }
1031 }
1032
1033 debug!(
1035 "[CLIENT] AgentClient dropped cleanly: {}",
1036 self.config.agent_id
1037 );
1038 }
1039}
1040
1041#[cfg(test)]
1042mod tests {
1043 use super::*;
1044 use feagi_io::AgentType;
1045
1046 #[test]
1047 fn test_client_creation() {
1048 let config = AgentConfig::new("test_agent", AgentType::Sensory)
1049 .with_vision_capability("camera", (640, 480), 3, "i_vision")
1050 .with_registration_endpoint("tcp://localhost:8000")
1051 .with_sensory_endpoint("tcp://localhost:5558");
1052
1053 let client = AgentClient::new(config);
1054 assert!(client.is_ok());
1055
1056 let client = client.unwrap();
1057 assert!(!client.is_registered());
1058 assert_eq!(client.agent_id(), "test_agent");
1059 }
1060
1061 }