1use crate::core::config::AgentConfig;
7use crate::core::error::{Result, SdkError};
8use crate::core::heartbeat::HeartbeatService;
9use crate::core::reconnect::{retry_with_backoff, ReconnectionStrategy};
10use feagi_io::AgentType;
11use futures::FutureExt;
12use std::future::Future;
13use std::net::{TcpStream, ToSocketAddrs};
14use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
15use std::sync::{Arc, Mutex};
16use std::time::Duration;
17use std::time::SystemTime;
18use tokio::runtime::Handle;
19use tokio::runtime::Runtime;
20use tokio::task::block_in_place;
21use tokio::time::timeout;
22use tracing::{debug, error, info, trace, warn};
23use zeromq::{
24 DealerSocket, PushSocket, Socket, SocketRecv, SocketSend, SubSocket, ZmqError, ZmqMessage,
25};
26
27fn block_on_with<T>(
28 handle: &Handle,
29 runtime: Option<&Runtime>,
30 future: impl Future<Output = T>,
31) -> T {
32 if Handle::try_current().is_ok() {
33 block_in_place(|| handle.block_on(future))
34 } else if let Some(runtime) = runtime {
35 runtime.block_on(future)
36 } else {
37 handle.block_on(future)
38 }
39}
40
41pub struct AgentClient {
67 config: AgentConfig,
69
70 runtime_handle: Handle,
72
73 runtime: Option<Arc<Runtime>>,
75
76 registration_socket: Option<Arc<Mutex<DealerSocket>>>,
78
79 sensory_socket: Option<Arc<Mutex<PushSocket>>>,
81
82 motor_socket: Option<Arc<Mutex<SubSocket>>>,
84
85 viz_socket: Option<Arc<Mutex<SubSocket>>>,
87
88 control_socket: Option<Arc<Mutex<DealerSocket>>>,
90
91 heartbeat: Option<HeartbeatService>,
93
94 registered: bool,
96
97 last_reconnect_attempt_ms: AtomicU64,
99
100 sensory_connected: AtomicBool,
102
103 last_registration_body: Option<serde_json::Value>,
111}
112
113impl AgentClient {
114 pub fn new(config: AgentConfig) -> Result<Self> {
120 config.validate()?;
122
123 let (runtime_handle, runtime) = if let Ok(handle) = Handle::try_current() {
124 (handle, None)
125 } else {
126 let runtime = Runtime::new()
127 .map_err(|e| SdkError::Other(format!("Failed to create runtime: {}", e)))?;
128 let handle = runtime.handle().clone();
129 (handle, Some(Arc::new(runtime)))
130 };
131
132 Ok(Self {
133 config,
134 runtime_handle,
135 runtime,
136 registration_socket: None,
137 sensory_socket: None,
138 motor_socket: None,
139 viz_socket: None,
140 control_socket: None,
141 heartbeat: None,
142 registered: false,
143 last_registration_body: None,
144 last_reconnect_attempt_ms: AtomicU64::new(0),
145 sensory_connected: AtomicBool::new(false),
146 })
147 }
148
149 pub fn registration_body_json(&self) -> Option<&serde_json::Value> {
153 self.last_registration_body.as_ref()
154 }
155
156 pub fn connect(&mut self) -> Result<()> {
172 if self.registered {
173 return Err(SdkError::AlreadyConnected);
174 }
175
176 info!(
177 "[CLIENT] Step 0: starting connection sequence to FEAGI: {}",
178 self.config.registration_endpoint
179 );
180
181 info!("[CLIENT] Step 1: creating control sockets (registration)...");
183 let mut socket_strategy = ReconnectionStrategy::new(
184 self.config.retry_backoff_ms,
185 self.config.registration_retries,
186 );
187 retry_with_backoff(
188 || self.create_sockets(),
189 &mut socket_strategy,
190 "Socket creation",
191 )?;
192 info!("[CLIENT] Step 1: control sockets created");
193
194 info!("[CLIENT] Step 2: registering with FEAGI...");
196 let mut reg_strategy = ReconnectionStrategy::new(
197 self.config.retry_backoff_ms,
198 self.config.registration_retries,
199 );
200 retry_with_backoff(|| self.register(), &mut reg_strategy, "Registration")?;
201 info!("[CLIENT] Step 2: registration successful");
202
203 info!("[CLIENT] Step 3: connecting to data streams with retry...");
207 info!(
208 "[CLIENT] sensory endpoint: {}",
209 self.config.sensory_endpoint
210 );
211 if matches!(self.config.agent_type, AgentType::Motor | AgentType::Both) {
212 info!("[CLIENT] motor endpoint: {}", self.config.motor_endpoint);
213 }
214 let mut data_socket_strategy = ReconnectionStrategy::new(
215 self.config.retry_backoff_ms,
216 self.config.registration_retries,
217 );
218 retry_with_backoff(
219 || self.connect_data_sockets(),
220 &mut data_socket_strategy,
221 "Data socket creation",
222 )?;
223 info!("[CLIENT] Step 3: data sockets connected");
224
225 info!("[CLIENT] Step 4: starting heartbeat service...");
227 if self.config.heartbeat_interval > 0.0 {
228 self.start_heartbeat()?;
229 info!("[CLIENT] Step 4: heartbeat service started");
230 } else {
231 info!("[CLIENT] Step 4: heartbeat disabled (interval = 0)");
232 }
233
234 info!(
235 "[CLIENT] fully connected to FEAGI as agent: {}",
236 self.config.agent_id
237 );
238 Ok(())
239 }
240
241 fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
242 block_on_with(&self.runtime_handle, self.runtime.as_deref(), future)
243 }
244
245 fn create_sockets(&mut self) -> Result<()> {
247 let mut reg_socket = DealerSocket::new();
249 self.block_on(reg_socket.connect(&self.config.registration_endpoint))
250 .map_err(SdkError::Zmq)?;
251 self.registration_socket = Some(Arc::new(Mutex::new(reg_socket)));
252
253 if matches!(self.config.agent_type, AgentType::Infrastructure) {
255 let mut control_socket = DealerSocket::new();
256 self.block_on(control_socket.connect(&self.config.control_endpoint))
257 .map_err(SdkError::Zmq)?;
258 self.control_socket = Some(Arc::new(Mutex::new(control_socket)));
259 debug!("[CLIENT] ✓ Control/API socket created");
260 }
261
262 debug!("[CLIENT] ✓ ZMQ control sockets created");
263 Ok(())
264 }
265
266 fn connect_data_sockets(&mut self) -> Result<()> {
268 self.sensory_connected.store(false, Ordering::Relaxed);
270 self.wait_for_tcp_endpoint("sensory", &self.config.sensory_endpoint)?;
271 let mut sensory_socket = PushSocket::new();
272 self.block_on(sensory_socket.connect(&self.config.sensory_endpoint))
273 .map_err(SdkError::Zmq)?;
274 self.sensory_socket = Some(Arc::new(Mutex::new(sensory_socket)));
275
276 if matches!(self.config.agent_type, AgentType::Motor | AgentType::Both) {
278 info!(
279 "[SDK-CONNECT] 🎮 Initializing motor socket for agent '{}' (type: {:?})",
280 self.config.agent_id, self.config.agent_type
281 );
282 info!(
283 "[SDK-CONNECT] 🎮 Motor endpoint: {}",
284 self.config.motor_endpoint
285 );
286
287 self.wait_for_tcp_endpoint("motor", &self.config.motor_endpoint)?;
288 let mut motor_socket = SubSocket::new();
289 self.block_on(motor_socket.connect(&self.config.motor_endpoint))
290 .map_err(SdkError::Zmq)?;
291 info!("[SDK-CONNECT] ✅ Motor socket connected");
292
293 info!("[SDK-CONNECT] 🎮 Subscribing to all motor topics");
303 self.block_on(motor_socket.subscribe(""))
304 .map_err(SdkError::Zmq)?;
305 info!("[SDK-CONNECT] ✅ Motor subscription set (all topics)");
306
307 self.motor_socket = Some(Arc::new(Mutex::new(motor_socket)));
308 info!("[SDK-CONNECT] ✅ Motor socket initialized successfully");
309 } else {
310 info!(
311 "[SDK-CONNECT] ⚠️ Motor socket NOT initialized (agent type: {:?})",
312 self.config.agent_type
313 );
314 }
315
316 if matches!(
318 self.config.agent_type,
319 AgentType::Visualization | AgentType::Infrastructure
320 ) {
321 self.wait_for_tcp_endpoint("visualization", &self.config.visualization_endpoint)?;
322 let mut viz_socket = SubSocket::new();
323 self.block_on(viz_socket.connect(&self.config.visualization_endpoint))
324 .map_err(SdkError::Zmq)?;
325
326 self.block_on(viz_socket.subscribe(""))
328 .map_err(SdkError::Zmq)?;
329 self.viz_socket = Some(Arc::new(Mutex::new(viz_socket)));
330 debug!("[CLIENT] ✓ Visualization socket created");
331 }
332
333 debug!("[CLIENT] ✓ ZMQ data sockets created");
334 Ok(())
335 }
336
337 pub fn reconnect_data_streams(&mut self) -> Result<()> {
339 if !self.registered {
340 return Err(SdkError::NotRegistered);
341 }
342
343 let now_ms = SystemTime::now()
344 .duration_since(SystemTime::UNIX_EPOCH)
345 .unwrap_or_default()
346 .as_millis() as u64;
347 let last_ms = self.last_reconnect_attempt_ms.load(Ordering::Relaxed);
348 let min_interval_ms = self.config.retry_backoff_ms.max(1);
349 if now_ms.saturating_sub(last_ms) < min_interval_ms {
350 return Ok(());
351 }
352 self.last_reconnect_attempt_ms
353 .store(now_ms, Ordering::Relaxed);
354
355 self.close_data_sockets();
356
357 let mut data_socket_strategy = ReconnectionStrategy::new(
358 self.config.retry_backoff_ms,
359 self.config.registration_retries,
360 );
361 retry_with_backoff(
362 || self.connect_data_sockets(),
363 &mut data_socket_strategy,
364 "Data socket reconnection",
365 )
366 }
367
368 fn close_data_sockets(&mut self) {
369 self.sensory_connected.store(false, Ordering::Relaxed);
370 if let Some(socket) = self.sensory_socket.take() {
371 if let Ok(socket) = Arc::try_unwrap(socket) {
372 match socket.into_inner() {
373 Ok(socket) => {
374 let _ = self.block_on(socket.close());
375 }
376 Err(poisoned) => {
377 let _ = self.block_on(poisoned.into_inner().close());
378 }
379 }
380 }
381 }
382 if let Some(socket) = self.motor_socket.take() {
383 if let Ok(socket) = Arc::try_unwrap(socket) {
384 match socket.into_inner() {
385 Ok(socket) => {
386 let _ = self.block_on(socket.close());
387 }
388 Err(poisoned) => {
389 let _ = self.block_on(poisoned.into_inner().close());
390 }
391 }
392 }
393 }
394 if let Some(socket) = self.viz_socket.take() {
395 if let Ok(socket) = Arc::try_unwrap(socket) {
396 match socket.into_inner() {
397 Ok(socket) => {
398 let _ = self.block_on(socket.close());
399 }
400 Err(poisoned) => {
401 let _ = self.block_on(poisoned.into_inner().close());
402 }
403 }
404 }
405 }
406 }
407
408 fn wait_for_tcp_endpoint(&self, label: &str, endpoint: &str) -> Result<()> {
409 let address = endpoint
410 .strip_prefix("tcp://")
411 .ok_or_else(|| {
412 SdkError::InvalidConfig(format!("Invalid {} endpoint: {}", label, endpoint))
413 })?
414 .to_string();
415 let timeout = Duration::from_millis(self.config.connection_timeout_ms);
416
417 info!("[CLIENT] checking {} endpoint: {}", label, endpoint);
418 let mut strategy = ReconnectionStrategy::new(
419 self.config.retry_backoff_ms,
420 self.config.registration_retries,
421 );
422 retry_with_backoff(
423 || {
424 let mut addrs = address.to_socket_addrs().map_err(|e| {
425 SdkError::InvalidConfig(format!(
426 "Failed to resolve {} endpoint {}: {}",
427 label, endpoint, e
428 ))
429 })?;
430 let addr = addrs.next().ok_or_else(|| {
431 SdkError::InvalidConfig(format!(
432 "No resolved addresses for {} endpoint {}",
433 label, endpoint
434 ))
435 })?;
436 TcpStream::connect_timeout(&addr, timeout).map_err(|e| {
437 SdkError::Timeout(format!(
438 "{} endpoint {} not reachable: {}",
439 label, endpoint, e
440 ))
441 })?;
442 info!("[CLIENT] ✅ {} endpoint is reachable", label);
443 Ok(())
444 },
445 &mut strategy,
446 &format!("{} endpoint availability", label),
447 )
448 }
449
450 fn send_request(
451 &self,
452 socket: &Arc<Mutex<DealerSocket>>,
453 request: &serde_json::Value,
454 ) -> Result<serde_json::Value> {
455 let mut socket = socket
456 .lock()
457 .map_err(|e| SdkError::ThreadError(format!("Failed to lock socket: {}", e)))?;
458
459 let message = ZmqMessage::from(request.to_string().into_bytes());
460 self.block_on(socket.send(message)).map_err(SdkError::Zmq)?;
461
462 let timeout_ms = self.config.connection_timeout_ms;
463 let recv_result = if timeout_ms > 0 {
464 self.block_on(async {
465 timeout(std::time::Duration::from_millis(timeout_ms), socket.recv()).await
466 })
467 .map_err(|_| SdkError::Timeout(format!("Request timed out after {}ms", timeout_ms)))?
468 .map_err(SdkError::Zmq)?
469 } else {
470 self.block_on(socket.recv()).map_err(SdkError::Zmq)?
471 };
472
473 let frames = recv_result.into_vec();
474 let payload = frames
475 .last()
476 .ok_or_else(|| SdkError::Other("Response was empty".to_string()))?;
477 let response: serde_json::Value = serde_json::from_slice(payload)?;
478 Ok(response)
479 }
480
481 fn register(&mut self) -> Result<()> {
483 let registration_msg = serde_json::json!({
484 "method": "POST",
485 "path": "/v1/agent/register",
486 "body": {
487 "agent_id": self.config.agent_id,
488 "agent_type": match self.config.agent_type {
489 AgentType::Sensory => "sensory",
490 AgentType::Motor => "motor",
491 AgentType::Both => "both",
492 AgentType::Visualization => "visualization",
493 AgentType::Infrastructure => "infrastructure",
494 },
495 "capabilities": self.config.capabilities,
496 }
497 });
498
499 let socket = self
500 .registration_socket
501 .as_ref()
502 .ok_or_else(|| SdkError::Other("Registration socket not initialized".to_string()))?;
503
504 debug!(
505 "[CLIENT] Sending registration request for: {}",
506 self.config.agent_id
507 );
508 let response = self.send_request(socket, ®istration_msg)?;
509
510 let status_code = response
512 .get("status")
513 .and_then(|s| s.as_u64())
514 .unwrap_or(500);
515 if status_code == 200 {
516 self.registered = true;
517 let empty_body = serde_json::json!({});
519 let body = response.get("body").unwrap_or(&empty_body);
520 self.last_registration_body = Some(body.clone());
521 info!("[CLIENT] ✓ Registration successful: {:?}", response);
522 Ok(())
523 } else {
524 let empty_body = serde_json::json!({});
525 let body = response.get("body").unwrap_or(&empty_body);
526 let message = body
527 .get("error")
528 .and_then(|m| m.as_str())
529 .unwrap_or("Unknown error");
530 self.last_registration_body = None;
532
533 if message.contains("already registered") {
535 warn!("[CLIENT] ⚠ Agent already registered - attempting deregistration and retry");
536 self.deregister()?;
537
538 info!("[CLIENT] Retrying registration after deregistration...");
540 std::thread::sleep(std::time::Duration::from_millis(100)); self.register_with_retry_once()
544 } else {
545 error!("[CLIENT] ✗ Registration failed: {}", message);
546 Err(SdkError::RegistrationFailed(message.to_string()))
547 }
548 }
549 }
550
551 fn register_with_retry_once(&mut self) -> Result<()> {
553 let registration_msg = serde_json::json!({
554 "method": "POST",
555 "path": "/v1/agent/register",
556 "body": {
557 "agent_id": self.config.agent_id,
558 "agent_type": match self.config.agent_type {
559 AgentType::Sensory => "sensory",
560 AgentType::Motor => "motor",
561 AgentType::Both => "both",
562 AgentType::Visualization => "visualization",
563 AgentType::Infrastructure => "infrastructure",
564 },
565 "capabilities": self.config.capabilities,
566 }
567 });
568
569 let socket = self
570 .registration_socket
571 .as_ref()
572 .ok_or_else(|| SdkError::Other("Registration socket not initialized".to_string()))?;
573
574 debug!(
575 "[CLIENT] Sending registration request (retry) for: {}",
576 self.config.agent_id
577 );
578 let response = self.send_request(socket, ®istration_msg)?;
579
580 let status_code = response
582 .get("status")
583 .and_then(|s| s.as_u64())
584 .unwrap_or(500);
585 if status_code == 200 {
586 self.registered = true;
587 let empty_body = serde_json::json!({});
589 let body = response.get("body").unwrap_or(&empty_body);
590 self.last_registration_body = Some(body.clone());
591 info!(
592 "[CLIENT] ✓ Registration successful (after retry): {:?}",
593 response
594 );
595 Ok(())
596 } else {
597 let empty_body = serde_json::json!({});
598 let body = response.get("body").unwrap_or(&empty_body);
599 let message = body
600 .get("error")
601 .and_then(|m| m.as_str())
602 .unwrap_or("Unknown error");
603 self.last_registration_body = None;
604 error!("[CLIENT] ✗ Registration retry failed: {}", message);
605 Err(SdkError::RegistrationFailed(message.to_string()))
606 }
607 }
608
609 fn deregister(&mut self) -> Result<()> {
611 if !self.registered && self.registration_socket.is_none() {
612 return Ok(()); }
614
615 info!("[CLIENT] Deregistering agent: {}", self.config.agent_id);
616
617 let deregistration_msg = serde_json::json!({
618 "method": "DELETE",
619 "path": "/v1/agent/deregister",
620 "body": {
621 "agent_id": self.config.agent_id,
622 }
623 });
624
625 if let Some(socket) = &self.registration_socket {
626 match self.send_request(socket, &deregistration_msg) {
627 Ok(response) => {
628 if response.get("status").and_then(|s| s.as_u64()) == Some(200) {
629 info!("[CLIENT] ✓ Deregistration successful");
630 } else {
631 warn!("[CLIENT] ⚠ Deregistration returned: {:?}", response);
632 }
633 }
634 Err(e) => {
635 warn!("[CLIENT] ⚠ Deregistration timeout/error: {}", e);
636 }
637 };
638 }
639
640 self.registered = false;
641 Ok(())
642 }
643
644 fn start_heartbeat(&mut self) -> Result<()> {
646 if self.heartbeat.is_some() {
647 return Ok(());
648 }
649
650 let socket = self
651 .registration_socket
652 .as_ref()
653 .ok_or_else(|| SdkError::Other("Registration socket not initialized".to_string()))?;
654
655 let agent_type = match self.config.agent_type {
656 AgentType::Sensory => "sensory",
657 AgentType::Motor => "motor",
658 AgentType::Both => "both",
659 AgentType::Visualization => "visualization",
660 AgentType::Infrastructure => "infrastructure",
661 }
662 .to_string();
663 let capabilities = serde_json::to_value(&self.config.capabilities)
664 .map_err(|e| SdkError::Other(format!("Failed to serialize capabilities: {e}")))?;
665
666 let reconnect_spec = crate::core::heartbeat::ReconnectSpec {
667 agent_id: self.config.agent_id.clone(),
668 agent_type,
669 capabilities,
670 registration_retries: self.config.registration_retries,
671 retry_backoff_ms: self.config.retry_backoff_ms,
672 };
673
674 let mut heartbeat = HeartbeatService::new(
675 self.config.agent_id.clone(),
676 Arc::clone(socket),
677 self.runtime_handle.clone(),
678 self.runtime.clone(),
679 self.config.heartbeat_interval,
680 )
681 .with_reconnect_spec(reconnect_spec);
682
683 heartbeat.start()?;
684 self.heartbeat = Some(heartbeat);
685
686 debug!(
687 "[CLIENT] ✓ Heartbeat service started (interval: {}s)",
688 self.config.heartbeat_interval
689 );
690 Ok(())
691 }
692
693 pub fn send_sensory_data(&self, neuron_pairs: Vec<(i32, f64)>) -> Result<()> {
707 if !self.registered {
708 return Err(SdkError::NotRegistered);
709 }
710
711 let socket = self
712 .sensory_socket
713 .as_ref()
714 .ok_or_else(|| SdkError::Other("Sensory socket not initialized".to_string()))?;
715
716 use feagi_structures::genomic::cortical_area::CorticalID;
719 use feagi_structures::neuron_voxels::xyzp::{
720 CorticalMappedXYZPNeuronVoxels, NeuronVoxelXYZPArrays,
721 };
722
723 let vision_cap = self
725 .config
726 .capabilities
727 .vision
728 .as_ref()
729 .ok_or_else(|| SdkError::Other("No vision capability configured".to_string()))?;
730
731 let (width, _height) = vision_cap.dimensions;
732
733 let cortical_id = if let (Some(unit), Some(group_index)) =
735 (vision_cap.unit, vision_cap.group)
736 {
737 use feagi_structures::genomic::cortical_area::io_cortical_area_configuration_flag::PercentageNeuronPositioning;
738 use feagi_structures::genomic::SensoryCorticalUnit;
739
740 let group: feagi_structures::genomic::cortical_area::descriptors::CorticalUnitIndex =
741 group_index.into();
742 let frame_change_handling = feagi_structures::genomic::cortical_area::io_cortical_area_configuration_flag::FrameChangeHandling::Absolute;
743 let percentage_neuron_positioning = PercentageNeuronPositioning::Linear;
744
745 let sensory_unit = match unit {
746 feagi_io::SensoryUnit::Infrared => SensoryCorticalUnit::Infrared,
747 feagi_io::SensoryUnit::Proximity => SensoryCorticalUnit::Proximity,
748 feagi_io::SensoryUnit::Shock => SensoryCorticalUnit::Shock,
749 feagi_io::SensoryUnit::Battery => SensoryCorticalUnit::Battery,
750 feagi_io::SensoryUnit::Servo => SensoryCorticalUnit::Servo,
751 feagi_io::SensoryUnit::AnalogGpio => SensoryCorticalUnit::AnalogGPIO,
752 feagi_io::SensoryUnit::DigitalGpio => SensoryCorticalUnit::DigitalGPIO,
753 feagi_io::SensoryUnit::MiscData => SensoryCorticalUnit::MiscData,
754 feagi_io::SensoryUnit::TextEnglishInput => SensoryCorticalUnit::TextEnglishInput,
755 feagi_io::SensoryUnit::CountInput => SensoryCorticalUnit::CountInput,
756 feagi_io::SensoryUnit::Vision => SensoryCorticalUnit::Vision,
757 feagi_io::SensoryUnit::SegmentedVision => SensoryCorticalUnit::SegmentedVision,
758 feagi_io::SensoryUnit::Accelerometer => SensoryCorticalUnit::Accelerometer,
759 feagi_io::SensoryUnit::Gyroscope => SensoryCorticalUnit::Gyroscope,
760 };
761
762 match sensory_unit {
765 SensoryCorticalUnit::Infrared => {
766 SensoryCorticalUnit::get_cortical_ids_array_for_infrared_with_parameters(
767 frame_change_handling,
768 percentage_neuron_positioning,
769 group,
770 )[0]
771 }
772 SensoryCorticalUnit::Proximity => {
773 SensoryCorticalUnit::get_cortical_ids_array_for_proximity_with_parameters(
774 frame_change_handling,
775 percentage_neuron_positioning,
776 group,
777 )[0]
778 }
779 SensoryCorticalUnit::Shock => {
780 SensoryCorticalUnit::get_cortical_ids_array_for_shock_with_parameters(
781 frame_change_handling,
782 percentage_neuron_positioning,
783 group,
784 )[0]
785 }
786 SensoryCorticalUnit::Battery => {
787 SensoryCorticalUnit::get_cortical_ids_array_for_battery_with_parameters(
788 frame_change_handling,
789 percentage_neuron_positioning,
790 group,
791 )[0]
792 }
793 SensoryCorticalUnit::Servo => {
794 SensoryCorticalUnit::get_cortical_ids_array_for_servo_with_parameters(
795 frame_change_handling,
796 percentage_neuron_positioning,
797 group,
798 )[0]
799 }
800 SensoryCorticalUnit::AnalogGPIO => {
801 SensoryCorticalUnit::get_cortical_ids_array_for_analog_g_p_i_o_with_parameters(
802 frame_change_handling,
803 percentage_neuron_positioning,
804 group,
805 )[0]
806 }
807 SensoryCorticalUnit::DigitalGPIO => {
808 SensoryCorticalUnit::get_cortical_ids_array_for_digital_g_p_i_o_with_parameters(group)[0]
809 }
810 SensoryCorticalUnit::MiscData => {
811 SensoryCorticalUnit::get_cortical_ids_array_for_misc_data_with_parameters(
812 frame_change_handling,
813 group,
814 )[0]
815 }
816 SensoryCorticalUnit::TextEnglishInput => {
817 SensoryCorticalUnit::get_cortical_ids_array_for_text_english_input_with_parameters(
818 frame_change_handling,
819 group,
820 )[0]
821 }
822 SensoryCorticalUnit::CountInput => {
823 SensoryCorticalUnit::get_cortical_ids_array_for_count_input_with_parameters(
824 frame_change_handling,
825 percentage_neuron_positioning,
826 group,
827 )[0]
828 }
829 SensoryCorticalUnit::Vision => {
830 SensoryCorticalUnit::get_cortical_ids_array_for_vision_with_parameters(
831 frame_change_handling,
832 group,
833 )[0]
834 }
835 SensoryCorticalUnit::SegmentedVision => {
836 SensoryCorticalUnit::get_cortical_ids_array_for_segmented_vision_with_parameters(
837 frame_change_handling,
838 group,
839 )[0]
840 }
841 SensoryCorticalUnit::Accelerometer => {
842 SensoryCorticalUnit::get_cortical_ids_array_for_accelerometer_with_parameters(
843 frame_change_handling,
844 percentage_neuron_positioning,
845 group,
846 )[0]
847 }
848 SensoryCorticalUnit::Gyroscope => {
849 SensoryCorticalUnit::get_cortical_ids_array_for_gyroscope_with_parameters(
850 frame_change_handling,
851 percentage_neuron_positioning,
852 group,
853 )[0]
854 }
855 }
856 } else {
857 let cortical_area = &vision_cap.target_cortical_area;
858
859 let mut bytes = [b' '; 8];
861 let name_bytes = cortical_area.as_bytes();
862 let copy_len = name_bytes.len().min(8);
863 bytes[..copy_len].copy_from_slice(&name_bytes[..copy_len]);
864 CorticalID::try_from_bytes(&bytes).map_err(|e| {
865 SdkError::Other(format!("Invalid cortical ID '{}': {:?}", cortical_area, e))
866 })?
867 };
868
869 let mut x_coords = Vec::with_capacity(neuron_pairs.len());
871 let mut y_coords = Vec::with_capacity(neuron_pairs.len());
872 let mut z_coords = Vec::with_capacity(neuron_pairs.len());
873 let mut potentials = Vec::with_capacity(neuron_pairs.len());
874
875 for (neuron_id, potential) in neuron_pairs {
876 let neuron_id = neuron_id as u32;
877 x_coords.push(neuron_id % (width as u32));
878 y_coords.push(neuron_id / (width as u32));
879 z_coords.push(0); potentials.push(potential as f32);
881 }
882
883 let _neuron_count = x_coords.len(); let neuron_arrays =
887 NeuronVoxelXYZPArrays::new_from_vectors(x_coords, y_coords, z_coords, potentials)
888 .map_err(|e| SdkError::Other(format!("Failed to create neuron arrays: {:?}", e)))?;
889
890 let cortical_id_log = cortical_id.as_base_64();
892 let mut cortical_mapped = CorticalMappedXYZPNeuronVoxels::new();
893 cortical_mapped.insert(cortical_id, neuron_arrays);
894
895 let mut byte_container = feagi_serialization::FeagiByteContainer::new_empty();
897 byte_container
898 .overwrite_byte_data_with_single_struct_data(&cortical_mapped, 0)
899 .map_err(|e| SdkError::Other(format!("Failed to serialize to container: {:?}", e)))?;
900
901 let buffer = byte_container.get_byte_ref().to_vec();
902
903 let mut socket = socket
905 .lock()
906 .map_err(|e| SdkError::ThreadError(format!("Failed to lock socket: {}", e)))?;
907 let message = ZmqMessage::from(buffer.clone());
908 self.block_on(socket.send(message)).map_err(SdkError::Zmq)?;
909
910 debug!(
911 "[CLIENT] Sent {} bytes XYZP binary to {}",
912 buffer.len(),
913 cortical_id_log
914 );
915 Ok(())
916 }
917
918 pub fn send_sensory_bytes(&self, bytes: Vec<u8>) -> Result<()> {
927 let _ = self.try_send_sensory_bytes(&bytes)?;
928 Ok(())
929 }
930
931 pub fn try_send_sensory_bytes(&self, bytes: &[u8]) -> Result<bool> {
938 if !self.registered {
939 return Err(SdkError::NotRegistered);
940 }
941
942 let socket = self
943 .sensory_socket
944 .as_ref()
945 .ok_or_else(|| SdkError::Other("Sensory socket not initialized".to_string()))?;
946
947 let mut socket = socket
948 .lock()
949 .map_err(|e| SdkError::ThreadError(format!("Failed to lock socket: {}", e)))?;
950 let message = ZmqMessage::from(bytes.to_vec());
951
952 if !self.sensory_connected.load(Ordering::Relaxed) {
953 let timeout_ms = self.config.connection_timeout_ms;
954 let send_result = self.block_on(async {
955 timeout(
956 std::time::Duration::from_millis(timeout_ms),
957 socket.send(message),
958 )
959 .await
960 });
961 match send_result {
962 Ok(Ok(())) => {
963 self.sensory_connected.store(true, Ordering::Relaxed);
964 debug!("[CLIENT] Sensory socket connected (first send)");
965 return Ok(true);
966 }
967 Ok(Err(ZmqError::BufferFull(_))) => return Ok(false),
968 Ok(Err(e)) => {
969 let message = e.to_string();
970 if message.contains("Not connected to peers") {
971 return Ok(false);
972 }
973 return Err(SdkError::Zmq(e));
974 }
975 Err(_) => return Ok(false),
976 }
977 }
978
979 let send_result = self.block_on(async { socket.send(message).now_or_never() });
980 match send_result {
981 Some(Ok(())) => {
982 debug!("[CLIENT] Sent {} bytes sensory (raw)", bytes.len());
983 Ok(true)
984 }
985 None | Some(Err(ZmqError::BufferFull(_))) => {
986 static DROPPED: AtomicU64 = AtomicU64::new(0);
988 static LAST_LOG_MS: AtomicU64 = AtomicU64::new(0);
989
990 let dropped = DROPPED.fetch_add(1, Ordering::Relaxed) + 1;
991 let now_ms = SystemTime::now()
992 .duration_since(SystemTime::UNIX_EPOCH)
993 .unwrap_or_default()
994 .as_millis() as u64;
995
996 let last_ms = LAST_LOG_MS.load(Ordering::Relaxed);
997 if now_ms.saturating_sub(last_ms) >= 5_000
999 && LAST_LOG_MS
1000 .compare_exchange(last_ms, now_ms, Ordering::Relaxed, Ordering::Relaxed)
1001 .is_ok()
1002 {
1003 warn!(
1004 "[CLIENT] Sensory backpressure: dropped_messages={} last_payload_bytes={}",
1005 dropped,
1006 bytes.len()
1007 );
1008 }
1009
1010 Ok(false)
1011 }
1012 Some(Err(e)) => {
1013 let message = e.to_string();
1014 if message.contains("Not connected to peers") {
1015 static DROPPED: AtomicU64 = AtomicU64::new(0);
1016 static LAST_LOG_MS: AtomicU64 = AtomicU64::new(0);
1017
1018 let dropped = DROPPED.fetch_add(1, Ordering::Relaxed) + 1;
1019 let now_ms = SystemTime::now()
1020 .duration_since(SystemTime::UNIX_EPOCH)
1021 .unwrap_or_default()
1022 .as_millis() as u64;
1023
1024 let last_ms = LAST_LOG_MS.load(Ordering::Relaxed);
1025 if now_ms.saturating_sub(last_ms) >= 5_000
1026 && LAST_LOG_MS
1027 .compare_exchange(last_ms, now_ms, Ordering::Relaxed, Ordering::Relaxed)
1028 .is_ok()
1029 {
1030 warn!(
1031 "[CLIENT] Sensory not connected: dropped_messages={} last_payload_bytes={}",
1032 dropped,
1033 bytes.len()
1034 );
1035 }
1036
1037 return Ok(false);
1038 }
1039
1040 Err(SdkError::Zmq(e))
1041 }
1042 }
1043 }
1044
1045 pub fn receive_motor_data(
1062 &self,
1063 ) -> Result<Option<feagi_structures::neuron_voxels::xyzp::CorticalMappedXYZPNeuronVoxels>> {
1064 use feagi_structures::neuron_voxels::xyzp::CorticalMappedXYZPNeuronVoxels;
1065
1066 if !self.registered {
1067 return Err(SdkError::NotRegistered);
1068 }
1069
1070 let socket = self.motor_socket.as_ref().ok_or_else(|| {
1071 error!("[CLIENT] receive_motor_data() called but motor_socket is None");
1072 SdkError::Other("Motor socket not initialized (not a motor agent?)".to_string())
1073 })?;
1074
1075 let mut socket = socket
1076 .lock()
1077 .map_err(|e| SdkError::ThreadError(format!("Failed to lock socket: {}", e)))?;
1078 let recv_result = self.block_on(async { socket.recv().now_or_never() });
1079 let message = match recv_result {
1080 None => return Ok(None),
1081 Some(Ok(message)) => message,
1082 Some(Err(e)) => return Err(SdkError::Zmq(e)),
1083 };
1084
1085 let mut frames = message.into_vec();
1089 if frames.is_empty() {
1090 return Ok(None);
1091 }
1092 let (_topic_opt, data) = if frames.len() == 2 {
1093 let topic = frames.remove(0).to_vec();
1094 trace!(
1095 "[CLIENT] Motor multipart topic: '{}'",
1096 String::from_utf8_lossy(&topic)
1097 );
1098 let data = frames.remove(0).to_vec();
1099 trace!("[CLIENT] Received motor data frame: {} bytes", data.len());
1100 (Some(topic), data)
1101 } else if frames.len() == 1 {
1102 (None, frames.remove(0).to_vec())
1103 } else {
1104 return Err(SdkError::Other(
1105 "Unexpected multipart motor payload".to_string(),
1106 ));
1107 };
1108
1109 let mut byte_container = feagi_serialization::FeagiByteContainer::new_empty();
1118 let mut data_vec = data.to_vec();
1119
1120 byte_container
1122 .try_write_data_to_container_and_verify(&mut |bytes| {
1123 std::mem::swap(bytes, &mut data_vec);
1124 Ok(())
1125 })
1126 .map_err(|e| SdkError::Other(format!("Failed to load motor data bytes: {:?}", e)))?;
1127
1128 let num_structures = byte_container
1130 .try_get_number_contained_structures()
1131 .map_err(|e| SdkError::Other(format!("Failed to get structure count: {:?}", e)))?;
1132
1133 if num_structures == 0 {
1134 return Ok(None);
1135 }
1136
1137 let boxed_struct = byte_container
1139 .try_create_new_struct_from_index(0)
1140 .map_err(|e| SdkError::Other(format!("Failed to extract motor structure: {:?}", e)))?;
1141
1142 let motor_data = boxed_struct
1144 .as_any()
1145 .downcast_ref::<CorticalMappedXYZPNeuronVoxels>()
1146 .ok_or_else(|| {
1147 SdkError::Other("Motor data is not CorticalMappedXYZPNeuronVoxels".to_string())
1148 })?
1149 .clone();
1150
1151 debug!(
1152 "[CLIENT] ✓ Received motor data ({} bytes, {} areas)",
1153 data.len(),
1154 motor_data.len()
1155 );
1156 Ok(Some(motor_data))
1157 }
1158
1159 pub fn receive_visualization_data(&self) -> Result<Option<Vec<u8>>> {
1171 if !self.registered {
1172 return Err(SdkError::NotRegistered);
1173 }
1174
1175 let socket = self.viz_socket.as_ref().ok_or_else(|| {
1176 SdkError::Other(
1177 "Visualization socket not initialized (not a visualization/infrastructure agent?)"
1178 .to_string(),
1179 )
1180 })?;
1181
1182 let mut socket = socket
1183 .lock()
1184 .map_err(|e| SdkError::ThreadError(format!("Failed to lock socket: {}", e)))?;
1185 let recv_result = self.block_on(async { socket.recv().now_or_never() });
1186 match recv_result {
1187 None => Ok(None),
1188 Some(Ok(message)) => {
1189 let mut frames = message.into_vec();
1190 if frames.is_empty() {
1191 return Ok(None);
1192 }
1193 let data = frames.pop().unwrap().to_vec();
1194 debug!(
1195 "[CLIENT] ✓ Received visualization data ({} bytes)",
1196 data.len()
1197 );
1198 Ok(Some(data))
1199 }
1200 Some(Err(e)) => Err(SdkError::Zmq(e)),
1201 }
1202 }
1203
1204 pub fn control_request(
1221 &self,
1222 method: &str,
1223 route: &str,
1224 data: Option<serde_json::Value>,
1225 ) -> Result<serde_json::Value> {
1226 if !self.registered {
1227 return Err(SdkError::NotRegistered);
1228 }
1229
1230 let socket = self.control_socket.as_ref().ok_or_else(|| {
1231 SdkError::Other(
1232 "Control socket not initialized (not an infrastructure agent?)".to_string(),
1233 )
1234 })?;
1235
1236 let mut request = serde_json::json!({
1238 "method": method,
1239 "route": route,
1240 "headers": {"content-type": "application/json"},
1241 });
1242
1243 if let Some(body) = data {
1244 request["body"] = body;
1245 }
1246
1247 let response = self.send_request(socket, &request)?;
1248 debug!("[CLIENT] ✓ Control request {} {} completed", method, route);
1249 Ok(response)
1250 }
1251
1252 pub fn is_registered(&self) -> bool {
1254 self.registered
1255 }
1256
1257 pub fn agent_id(&self) -> &str {
1259 &self.config.agent_id
1260 }
1261}
1262
1263impl Drop for AgentClient {
1264 fn drop(&mut self) {
1265 debug!("[CLIENT] Dropping AgentClient: {}", self.config.agent_id);
1266
1267 if let Some(mut heartbeat) = self.heartbeat.take() {
1269 debug!("[CLIENT] Stopping heartbeat service before cleanup");
1270 heartbeat.stop();
1271 debug!("[CLIENT] Heartbeat service stopped");
1272 }
1273
1274 if self.registered {
1276 debug!("[CLIENT] Deregistering agent: {}", self.config.agent_id);
1277 if let Err(e) = self.deregister() {
1278 warn!("[CLIENT] Deregistration failed during drop: {}", e);
1279 }
1281 }
1282
1283 debug!(
1285 "[CLIENT] AgentClient dropped cleanly: {}",
1286 self.config.agent_id
1287 );
1288 }
1289}
1290
1291#[cfg(test)]
1292mod tests {
1293 use super::*;
1294 use feagi_io::AgentType;
1295
1296 #[test]
1297 fn test_client_creation() {
1298 let config = AgentConfig::new("test_agent", AgentType::Sensory)
1299 .with_vision_capability("camera", (640, 480), 3, "i_vision")
1300 .with_registration_endpoint("tcp://localhost:8000")
1301 .with_sensory_endpoint("tcp://localhost:5558");
1302
1303 let client = AgentClient::new(config);
1304 assert!(client.is_ok());
1305
1306 let client = client.unwrap();
1307 assert!(!client.is_registered());
1308 assert_eq!(client.agent_id(), "test_agent");
1309 }
1310
1311 }