feagi_agent/core/
client.rs

1// Copyright 2025 Neuraville Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! FEAGI Agent Client implementation
5
6use crate::core::config::AgentConfig;
7use crate::core::error::{Result, SdkError};
8use crate::core::heartbeat::HeartbeatService;
9use crate::core::reconnect::{retry_with_backoff, ReconnectionStrategy};
10use feagi_io::AgentType;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Arc, Mutex};
13use std::time::SystemTime;
14use tracing::{debug, error, info, trace, warn};
15
16/// Main FEAGI Agent Client
17///
18/// This client handles:
19/// - Registration with FEAGI
20/// - Automatic heartbeat
21/// - Sending sensory data
22/// - Receiving motor data (for motor agents)
23/// - Automatic deregistration on drop
24///
25/// # Example
26/// ```ignore
27/// use feagi_agent::{AgentClient, AgentConfig, AgentType};
28///
29/// let config = AgentConfig::new("my_camera", AgentType::Sensory)
30///     .with_feagi_host("localhost")
31///     .with_vision_capability("camera", (640, 480), 3, "i_vision");
32///
33/// let mut client = AgentClient::new(config)?;
34/// client.connect()?;
35///
36/// // Send sensory data
37/// client.send_sensory_data(vec![(0, 50.0), (1, 75.0)])?;
38///
39/// // Client auto-deregisters on drop
40/// ```
41pub struct AgentClient {
42    /// Configuration
43    config: AgentConfig,
44
45    /// ZMQ context
46    context: zmq::Context,
47
48    /// Registration socket (ZMQ REQ - shared with heartbeat)
49    registration_socket: Option<Arc<Mutex<zmq::Socket>>>,
50
51    /// Sensory data socket (ZMQ PUSH)
52    sensory_socket: Option<zmq::Socket>,
53
54    /// Motor data socket (ZMQ SUB)
55    motor_socket: Option<zmq::Socket>,
56
57    /// Visualization stream socket (ZMQ SUB)
58    viz_socket: Option<zmq::Socket>,
59
60    /// Control/API socket (ZMQ REQ - REST over ZMQ)
61    control_socket: Option<zmq::Socket>,
62
63    /// Heartbeat service
64    heartbeat: Option<HeartbeatService>,
65
66    /// Registration state
67    registered: bool,
68
69    /// Last successful registration response body (JSON) returned by FEAGI.
70    ///
71    /// FEAGI registration is performed via "REST over ZMQ" and returns a wrapper:
72    /// `{ "status": 200, "body": { ... } }`. This field stores the `body` object.
73    ///
74    /// @cursor:ffi-safe - this is used by language bindings (Java JNI) to avoid
75    /// re-implementing FEAGI-specific response parsing in non-Rust SDKs.
76    last_registration_body: Option<serde_json::Value>,
77}
78
79impl AgentClient {
80    /// Create a new FEAGI agent client
81    ///
82    /// # Arguments
83    /// * `config` - Agent configuration
84    pub fn new(config: AgentConfig) -> Result<Self> {
85        // Validate configuration
86        config.validate()?;
87
88        let context = zmq::Context::new();
89
90        Ok(Self {
91            config,
92            context,
93            registration_socket: None,
94            sensory_socket: None,
95            motor_socket: None,
96            viz_socket: None,
97            control_socket: None,
98            heartbeat: None,
99            registered: false,
100            last_registration_body: None,
101        })
102    }
103
104    /// Get the last successful registration response body (JSON), if available.
105    ///
106    /// This is only set after a successful `connect()` / registration step.
107    pub fn registration_body_json(&self) -> Option<&serde_json::Value> {
108        self.last_registration_body.as_ref()
109    }
110
111    /// Connect to FEAGI and register the agent
112    ///
113    /// This will:
114    /// 1. Create ZMQ sockets
115    /// 2. Register with FEAGI
116    /// 3. Start heartbeat service (ONLY after successful registration)
117    ///
118    /// IMPORTANT: Background threads are ONLY started AFTER successful registration.
119    /// This prevents thread interference with GUI event loops (e.g., MuJoCo, Godot).
120    /// If registration fails, NO threads are spawned.
121    pub fn connect(&mut self) -> Result<()> {
122        if self.registered {
123            return Err(SdkError::AlreadyConnected);
124        }
125
126        info!(
127            "[CLIENT] Connecting to FEAGI: {}",
128            self.config.registration_endpoint
129        );
130
131        // Step 1: Create sockets with retry
132        let mut socket_strategy = ReconnectionStrategy::new(
133            self.config.retry_backoff_ms,
134            self.config.registration_retries,
135        );
136        retry_with_backoff(
137            || self.create_sockets(),
138            &mut socket_strategy,
139            "Socket creation",
140        )?;
141
142        // Step 2: Register with FEAGI with retry
143        let mut reg_strategy = ReconnectionStrategy::new(
144            self.config.retry_backoff_ms,
145            self.config.registration_retries,
146        );
147        retry_with_backoff(|| self.register(), &mut reg_strategy, "Registration")?;
148
149        // Step 3: Start heartbeat service (ONLY after successful registration)
150        // This is critical: threads are only spawned AFTER we know FEAGI is reachable
151        if self.config.heartbeat_interval > 0.0 {
152            debug!("[CLIENT] Starting heartbeat service (post-registration)");
153            self.start_heartbeat()?;
154        } else {
155            debug!("[CLIENT] Heartbeat disabled (interval = 0)");
156        }
157
158        info!(
159            "[CLIENT] ✓ Connected and registered as: {}",
160            self.config.agent_id
161        );
162        Ok(())
163    }
164
165    /// Create ZMQ sockets
166    fn create_sockets(&mut self) -> Result<()> {
167        // Registration socket (REQ - for registration and heartbeat)
168        let reg_socket = self.context.socket(zmq::REQ)?;
169        reg_socket.set_rcvtimeo(self.config.connection_timeout_ms as i32)?;
170        reg_socket.set_sndtimeo(self.config.connection_timeout_ms as i32)?;
171        // @architecture:acceptable - compatibility with FEAGI ZMQ-REST ROUTER behavior
172        // Heartbeat uses the same REQ socket as registration. If FEAGI delays a reply,
173        // strict REQ state can raise EFSM on subsequent sends. Relaxed mode prevents
174        // heartbeat from breaking the socket state machine deterministically.
175        let _ = reg_socket.set_req_relaxed(true);
176        reg_socket.connect(&self.config.registration_endpoint)?;
177        self.registration_socket = Some(Arc::new(Mutex::new(reg_socket)));
178
179        // Sensory socket (PUSH - for sending data to FEAGI)
180        let sensory_socket = self.context.socket(zmq::PUSH)?;
181        sensory_socket.set_sndhwm(self.config.sensory_send_hwm)?;
182        sensory_socket.set_linger(self.config.sensory_linger_ms)?;
183        sensory_socket.set_immediate(self.config.sensory_immediate)?;
184        sensory_socket.connect(&self.config.sensory_endpoint)?;
185        self.sensory_socket = Some(sensory_socket);
186
187        // Motor socket (SUB - for receiving motor commands from FEAGI)
188        if matches!(self.config.agent_type, AgentType::Motor | AgentType::Both) {
189            info!(
190                "[SDK-CONNECT] 🎮 Initializing motor socket for agent '{}' (type: {:?})",
191                self.config.agent_id, self.config.agent_type
192            );
193            info!(
194                "[SDK-CONNECT] 🎮 Motor endpoint: {}",
195                self.config.motor_endpoint
196            );
197
198            let motor_socket = self.context.socket(zmq::SUB)?;
199            motor_socket.connect(&self.config.motor_endpoint)?;
200            info!("[SDK-CONNECT] ✅ Motor socket connected");
201
202            // Subscribe to all motor messages.
203            //
204            // FEAGI motor PUB may publish either:
205            // - multipart [agent_id, data] (preferred), or
206            // - single-frame [data] (legacy).
207            //
208            // Subscribing only to agent_id would miss the legacy single-frame format entirely,
209            // and also breaks if the publisher uses an empty topic. We subscribe to all, then
210            // filter by topic in receive_motor_data().
211            info!("[SDK-CONNECT] 🎮 Subscribing to all motor topics");
212            motor_socket.set_subscribe(b"")?;
213            info!("[SDK-CONNECT] ✅ Motor subscription set (all topics)");
214
215            self.motor_socket = Some(motor_socket);
216            info!("[SDK-CONNECT] ✅ Motor socket initialized successfully");
217        } else {
218            info!(
219                "[SDK-CONNECT] ⚠️ Motor socket NOT initialized (agent type: {:?})",
220                self.config.agent_type
221            );
222        }
223
224        // Visualization socket (SUB - for receiving neural activity stream from FEAGI)
225        if matches!(
226            self.config.agent_type,
227            AgentType::Visualization | AgentType::Infrastructure
228        ) {
229            let viz_socket = self.context.socket(zmq::SUB)?;
230            viz_socket.connect(&self.config.visualization_endpoint)?;
231
232            // Subscribe to all visualization messages
233            viz_socket.set_subscribe(b"")?;
234            self.viz_socket = Some(viz_socket);
235            debug!("[CLIENT] ✓ Visualization socket created");
236        }
237
238        // Control socket (REQ - for REST API requests over ZMQ)
239        if matches!(self.config.agent_type, AgentType::Infrastructure) {
240            let control_socket = self.context.socket(zmq::REQ)?;
241            control_socket.set_rcvtimeo(self.config.connection_timeout_ms as i32)?;
242            control_socket.set_sndtimeo(self.config.connection_timeout_ms as i32)?;
243            control_socket.connect(&self.config.control_endpoint)?;
244            self.control_socket = Some(control_socket);
245            debug!("[CLIENT] ✓ Control/API socket created");
246        }
247
248        debug!("[CLIENT] ✓ ZMQ sockets created");
249        Ok(())
250    }
251
252    /// Register with FEAGI
253    fn register(&mut self) -> Result<()> {
254        let registration_msg = serde_json::json!({
255            "method": "POST",
256            "path": "/v1/agent/register",
257            "body": {
258                "agent_id": self.config.agent_id,
259                "agent_type": match self.config.agent_type {
260                    AgentType::Sensory => "sensory",
261                    AgentType::Motor => "motor",
262                    AgentType::Both => "both",
263                    AgentType::Visualization => "visualization",
264                    AgentType::Infrastructure => "infrastructure",
265                },
266                "capabilities": self.config.capabilities,
267            }
268        });
269
270        let socket = self
271            .registration_socket
272            .as_ref()
273            .ok_or_else(|| SdkError::Other("Registration socket not initialized".to_string()))?;
274
275        // Send registration request and get response
276        let response = {
277            let socket = socket
278                .lock()
279                .map_err(|e| SdkError::ThreadError(format!("Failed to lock socket: {}", e)))?;
280
281            debug!(
282                "[CLIENT] Sending registration request for: {}",
283                self.config.agent_id
284            );
285            socket.send(registration_msg.to_string().as_bytes(), 0)?;
286
287            // Wait for response
288            let response_bytes = socket.recv_bytes(0)?;
289            serde_json::from_slice::<serde_json::Value>(&response_bytes)?
290        }; // Lock is dropped here
291
292        // Check response status (REST format: {"status": 200, "body": {...}})
293        let status_code = response
294            .get("status")
295            .and_then(|s| s.as_u64())
296            .unwrap_or(500);
297        if status_code == 200 {
298            self.registered = true;
299            // Capture the `body` for downstream consumers (FFI bindings).
300            let empty_body = serde_json::json!({});
301            let body = response.get("body").unwrap_or(&empty_body);
302            self.last_registration_body = Some(body.clone());
303            info!("[CLIENT] ✓ Registration successful: {:?}", response);
304            Ok(())
305        } else {
306            let empty_body = serde_json::json!({});
307            let body = response.get("body").unwrap_or(&empty_body);
308            let message = body
309                .get("error")
310                .and_then(|m| m.as_str())
311                .unwrap_or("Unknown error");
312            // Clear any previously cached registration body on failure.
313            self.last_registration_body = None;
314
315            // Check if already registered - try deregistration and retry
316            if message.contains("already registered") {
317                warn!("[CLIENT] ⚠ Agent already registered - attempting deregistration and retry");
318                self.deregister()?;
319
320                // Retry registration after deregistration
321                info!("[CLIENT] Retrying registration after deregistration...");
322                std::thread::sleep(std::time::Duration::from_millis(100)); // Brief delay
323
324                // Recursive retry (only once - avoid infinite loop)
325                self.register_with_retry_once()
326            } else {
327                error!("[CLIENT] ✗ Registration failed: {}", message);
328                Err(SdkError::RegistrationFailed(message.to_string()))
329            }
330        }
331    }
332
333    /// Register with FEAGI (with automatic retry after deregistration)
334    fn register_with_retry_once(&mut self) -> Result<()> {
335        let registration_msg = serde_json::json!({
336            "method": "POST",
337            "path": "/v1/agent/register",
338            "body": {
339                "agent_id": self.config.agent_id,
340                "agent_type": match self.config.agent_type {
341                    AgentType::Sensory => "sensory",
342                    AgentType::Motor => "motor",
343                    AgentType::Both => "both",
344                    AgentType::Visualization => "visualization",
345                    AgentType::Infrastructure => "infrastructure",
346                },
347                "capabilities": self.config.capabilities,
348            }
349        });
350
351        let socket = self
352            .registration_socket
353            .as_ref()
354            .ok_or_else(|| SdkError::Other("Registration socket not initialized".to_string()))?;
355
356        // Send registration request and get response
357        let response = {
358            let socket = socket
359                .lock()
360                .map_err(|e| SdkError::ThreadError(format!("Failed to lock socket: {}", e)))?;
361
362            debug!(
363                "[CLIENT] Sending registration request (retry) for: {}",
364                self.config.agent_id
365            );
366            socket.send(registration_msg.to_string().as_bytes(), 0)?;
367
368            // Wait for response
369            let response_bytes = socket.recv_bytes(0)?;
370            serde_json::from_slice::<serde_json::Value>(&response_bytes)?
371        }; // Lock is dropped here
372
373        // Check response status
374        let status_code = response
375            .get("status")
376            .and_then(|s| s.as_u64())
377            .unwrap_or(500);
378        if status_code == 200 {
379            self.registered = true;
380            // Capture the `body` for downstream consumers (FFI bindings).
381            let empty_body = serde_json::json!({});
382            let body = response.get("body").unwrap_or(&empty_body);
383            self.last_registration_body = Some(body.clone());
384            info!(
385                "[CLIENT] ✓ Registration successful (after retry): {:?}",
386                response
387            );
388            Ok(())
389        } else {
390            let empty_body = serde_json::json!({});
391            let body = response.get("body").unwrap_or(&empty_body);
392            let message = body
393                .get("error")
394                .and_then(|m| m.as_str())
395                .unwrap_or("Unknown error");
396            self.last_registration_body = None;
397            error!("[CLIENT] ✗ Registration retry failed: {}", message);
398            Err(SdkError::RegistrationFailed(message.to_string()))
399        }
400    }
401
402    /// Deregister from FEAGI
403    fn deregister(&mut self) -> Result<()> {
404        if !self.registered && self.registration_socket.is_none() {
405            return Ok(()); // Nothing to deregister
406        }
407
408        info!("[CLIENT] Deregistering agent: {}", self.config.agent_id);
409
410        let deregistration_msg = serde_json::json!({
411            "method": "DELETE",
412            "path": "/v1/agent/deregister",
413            "body": {
414                "agent_id": self.config.agent_id,
415            }
416        });
417
418        if let Some(socket) = &self.registration_socket {
419            let socket = socket
420                .lock()
421                .map_err(|e| SdkError::ThreadError(format!("Failed to lock socket: {}", e)))?;
422
423            // Send deregistration request
424            if let Err(e) = socket.send(deregistration_msg.to_string().as_bytes(), 0) {
425                warn!("[CLIENT] ⚠ Failed to send deregistration: {}", e);
426                return Ok(()); // Don't fail on deregistration error
427            }
428
429            // Wait for response (with timeout)
430            match socket.recv_bytes(0) {
431                Ok(response_bytes) => {
432                    let response: serde_json::Value = serde_json::from_slice(&response_bytes)?;
433                    if response.get("status").and_then(|s| s.as_str()) == Some("success") {
434                        info!("[CLIENT] ✓ Deregistration successful");
435                    } else {
436                        warn!("[CLIENT] ⚠ Deregistration returned: {:?}", response);
437                    }
438                }
439                Err(e) => {
440                    warn!("[CLIENT] ⚠ Deregistration timeout/error: {}", e);
441                }
442            }
443        }
444
445        self.registered = false;
446        Ok(())
447    }
448
449    /// Start heartbeat service
450    fn start_heartbeat(&mut self) -> Result<()> {
451        if self.heartbeat.is_some() {
452            return Ok(());
453        }
454
455        let socket = self
456            .registration_socket
457            .as_ref()
458            .ok_or_else(|| SdkError::Other("Registration socket not initialized".to_string()))?;
459
460        let agent_type = match self.config.agent_type {
461            AgentType::Sensory => "sensory",
462            AgentType::Motor => "motor",
463            AgentType::Both => "both",
464            AgentType::Visualization => "visualization",
465            AgentType::Infrastructure => "infrastructure",
466        }
467        .to_string();
468        let capabilities = serde_json::to_value(&self.config.capabilities)
469            .map_err(|e| SdkError::Other(format!("Failed to serialize capabilities: {e}")))?;
470
471        let reconnect_spec = crate::core::heartbeat::ReconnectSpec {
472            agent_id: self.config.agent_id.clone(),
473            agent_type,
474            capabilities,
475            registration_retries: self.config.registration_retries,
476            retry_backoff_ms: self.config.retry_backoff_ms,
477        };
478
479        let mut heartbeat = HeartbeatService::new(
480            self.config.agent_id.clone(),
481            Arc::clone(socket),
482            self.config.heartbeat_interval,
483        )
484        .with_reconnect_spec(reconnect_spec);
485
486        heartbeat.start()?;
487        self.heartbeat = Some(heartbeat);
488
489        debug!(
490            "[CLIENT] ✓ Heartbeat service started (interval: {}s)",
491            self.config.heartbeat_interval
492        );
493        Ok(())
494    }
495
496    /// Send sensory data to FEAGI
497    ///
498    /// # Arguments
499    /// * `neuron_pairs` - Vector of (neuron_id, potential) pairs
500    ///
501    /// # Example
502    /// ```ignore
503    /// client.send_sensory_data(vec![
504    ///     (0, 50.0),
505    ///     (1, 75.0),
506    ///     (2, 30.0),
507    /// ])?;
508    /// ```
509    pub fn send_sensory_data(&self, neuron_pairs: Vec<(i32, f64)>) -> Result<()> {
510        if !self.registered {
511            return Err(SdkError::NotRegistered);
512        }
513
514        let socket = self
515            .sensory_socket
516            .as_ref()
517            .ok_or_else(|| SdkError::Other("Sensory socket not initialized".to_string()))?;
518
519        // ARCHITECTURE COMPLIANCE: Use binary XYZP format, NOT JSON
520        // This serializes data using feagi_data_structures for cross-platform compatibility
521        use feagi_structures::genomic::cortical_area::CorticalID;
522        use feagi_structures::neuron_voxels::xyzp::{
523            CorticalMappedXYZPNeuronVoxels, NeuronVoxelXYZPArrays,
524        };
525
526        // Get cortical area and dimensions from vision capability
527        let vision_cap = self
528            .config
529            .capabilities
530            .vision
531            .as_ref()
532            .ok_or_else(|| SdkError::Other("No vision capability configured".to_string()))?;
533
534        let (width, _height) = vision_cap.dimensions;
535
536        // Derive cortical ID in a language-agnostic way if semantic unit+group is provided.
537        let cortical_id = if let (Some(unit), Some(group_index)) =
538            (vision_cap.unit, vision_cap.group)
539        {
540            use feagi_structures::genomic::cortical_area::io_cortical_area_configuration_flag::PercentageNeuronPositioning;
541            use feagi_structures::genomic::SensoryCorticalUnit;
542
543            let group: feagi_structures::genomic::cortical_area::descriptors::CorticalUnitIndex =
544                group_index.into();
545            let frame_change_handling = feagi_structures::genomic::cortical_area::io_cortical_area_configuration_flag::FrameChangeHandling::Absolute;
546            let percentage_neuron_positioning = PercentageNeuronPositioning::Linear;
547
548            let sensory_unit = match unit {
549                feagi_io::SensoryUnit::Infrared => SensoryCorticalUnit::Infrared,
550                feagi_io::SensoryUnit::Proximity => SensoryCorticalUnit::Proximity,
551                feagi_io::SensoryUnit::Shock => SensoryCorticalUnit::Shock,
552                feagi_io::SensoryUnit::Battery => SensoryCorticalUnit::Battery,
553                feagi_io::SensoryUnit::Servo => SensoryCorticalUnit::Servo,
554                feagi_io::SensoryUnit::AnalogGpio => SensoryCorticalUnit::AnalogGPIO,
555                feagi_io::SensoryUnit::DigitalGpio => SensoryCorticalUnit::DigitalGPIO,
556                feagi_io::SensoryUnit::MiscData => SensoryCorticalUnit::MiscData,
557                feagi_io::SensoryUnit::TextEnglishInput => SensoryCorticalUnit::TextEnglishInput,
558                feagi_io::SensoryUnit::CountInput => SensoryCorticalUnit::CountInput,
559                feagi_io::SensoryUnit::Vision => SensoryCorticalUnit::Vision,
560                feagi_io::SensoryUnit::SegmentedVision => SensoryCorticalUnit::SegmentedVision,
561                feagi_io::SensoryUnit::Accelerometer => SensoryCorticalUnit::Accelerometer,
562                feagi_io::SensoryUnit::Gyroscope => SensoryCorticalUnit::Gyroscope,
563            };
564
565            // Use the first sub-unit as the default send target for simple APIs.
566            // More advanced encoders should use the sensor cache mapping APIs instead.
567            match sensory_unit {
568                SensoryCorticalUnit::Infrared => {
569                    SensoryCorticalUnit::get_cortical_ids_array_for_infrared_with_parameters(
570                        frame_change_handling,
571                        percentage_neuron_positioning,
572                        group,
573                    )[0]
574                }
575                SensoryCorticalUnit::Proximity => {
576                    SensoryCorticalUnit::get_cortical_ids_array_for_proximity_with_parameters(
577                        frame_change_handling,
578                        percentage_neuron_positioning,
579                        group,
580                    )[0]
581                }
582                SensoryCorticalUnit::Shock => {
583                    SensoryCorticalUnit::get_cortical_ids_array_for_shock_with_parameters(
584                        frame_change_handling,
585                        percentage_neuron_positioning,
586                        group,
587                    )[0]
588                }
589                SensoryCorticalUnit::Battery => {
590                    SensoryCorticalUnit::get_cortical_ids_array_for_battery_with_parameters(
591                        frame_change_handling,
592                        percentage_neuron_positioning,
593                        group,
594                    )[0]
595                }
596                SensoryCorticalUnit::Servo => {
597                    SensoryCorticalUnit::get_cortical_ids_array_for_servo_with_parameters(
598                        frame_change_handling,
599                        percentage_neuron_positioning,
600                        group,
601                    )[0]
602                }
603                SensoryCorticalUnit::AnalogGPIO => {
604                    SensoryCorticalUnit::get_cortical_ids_array_for_analog_g_p_i_o_with_parameters(
605                        frame_change_handling,
606                        percentage_neuron_positioning,
607                        group,
608                    )[0]
609                }
610                SensoryCorticalUnit::DigitalGPIO => {
611                    SensoryCorticalUnit::get_cortical_ids_array_for_digital_g_p_i_o_with_parameters(group)[0]
612                }
613                SensoryCorticalUnit::MiscData => {
614                    SensoryCorticalUnit::get_cortical_ids_array_for_misc_data_with_parameters(
615                        frame_change_handling,
616                        group,
617                    )[0]
618                }
619                SensoryCorticalUnit::TextEnglishInput => {
620                    SensoryCorticalUnit::get_cortical_ids_array_for_text_english_input_with_parameters(
621                        frame_change_handling,
622                        group,
623                    )[0]
624                }
625                SensoryCorticalUnit::CountInput => {
626                    SensoryCorticalUnit::get_cortical_ids_array_for_count_input_with_parameters(
627                        frame_change_handling,
628                        percentage_neuron_positioning,
629                        group,
630                    )[0]
631                }
632                SensoryCorticalUnit::Vision => {
633                    SensoryCorticalUnit::get_cortical_ids_array_for_vision_with_parameters(
634                        frame_change_handling,
635                        group,
636                    )[0]
637                }
638                SensoryCorticalUnit::SegmentedVision => {
639                    SensoryCorticalUnit::get_cortical_ids_array_for_segmented_vision_with_parameters(
640                        frame_change_handling,
641                        group,
642                    )[0]
643                }
644                SensoryCorticalUnit::Accelerometer => {
645                    SensoryCorticalUnit::get_cortical_ids_array_for_accelerometer_with_parameters(
646                        frame_change_handling,
647                        percentage_neuron_positioning,
648                        group,
649                    )[0]
650                }
651                SensoryCorticalUnit::Gyroscope => {
652                    SensoryCorticalUnit::get_cortical_ids_array_for_gyroscope_with_parameters(
653                        frame_change_handling,
654                        percentage_neuron_positioning,
655                        group,
656                    )[0]
657                }
658            }
659        } else {
660            let cortical_area = &vision_cap.target_cortical_area;
661
662            // Legacy: Create CorticalID from area name
663            let mut bytes = [b' '; 8];
664            let name_bytes = cortical_area.as_bytes();
665            let copy_len = name_bytes.len().min(8);
666            bytes[..copy_len].copy_from_slice(&name_bytes[..copy_len]);
667            CorticalID::try_from_bytes(&bytes).map_err(|e| {
668                SdkError::Other(format!("Invalid cortical ID '{}': {:?}", cortical_area, e))
669            })?
670        };
671
672        // Convert flat neuron IDs to XYZP format
673        let mut x_coords = Vec::with_capacity(neuron_pairs.len());
674        let mut y_coords = Vec::with_capacity(neuron_pairs.len());
675        let mut z_coords = Vec::with_capacity(neuron_pairs.len());
676        let mut potentials = Vec::with_capacity(neuron_pairs.len());
677
678        for (neuron_id, potential) in neuron_pairs {
679            let neuron_id = neuron_id as u32;
680            x_coords.push(neuron_id % (width as u32));
681            y_coords.push(neuron_id / (width as u32));
682            z_coords.push(0); // Single channel grayscale
683            potentials.push(potential as f32);
684        }
685
686        let _neuron_count = x_coords.len(); // Reserved for future validation
687
688        // Create neuron arrays from vectors
689        let neuron_arrays =
690            NeuronVoxelXYZPArrays::new_from_vectors(x_coords, y_coords, z_coords, potentials)
691                .map_err(|e| SdkError::Other(format!("Failed to create neuron arrays: {:?}", e)))?;
692
693        // Create cortical mapped data
694        let cortical_id_log = cortical_id.as_base_64();
695        let mut cortical_mapped = CorticalMappedXYZPNeuronVoxels::new();
696        cortical_mapped.insert(cortical_id, neuron_arrays);
697
698        // Serialize to binary using FeagiByteContainer (version 2 container format)
699        let mut byte_container = feagi_serialization::FeagiByteContainer::new_empty();
700        byte_container
701            .overwrite_byte_data_with_single_struct_data(&cortical_mapped, 0)
702            .map_err(|e| SdkError::Other(format!("Failed to serialize to container: {:?}", e)))?;
703
704        let buffer = byte_container.get_byte_ref().to_vec();
705
706        // Send binary XYZP data (version 2 container format)
707        socket.send(&buffer, 0)?;
708
709        debug!(
710            "[CLIENT] Sent {} bytes XYZP binary to {}",
711            buffer.len(),
712            cortical_id_log
713        );
714        Ok(())
715    }
716
717    /// Send pre-serialized sensory bytes to FEAGI (real-time semantics).
718    ///
719    /// This is intended for high-performance clients (e.g., Python SDK brain_input)
720    /// that already produce FeagiByteContainer bytes via Rust-side encoding caches.
721    ///
722    /// Real-time policy:
723    /// - Uses ZMQ DONTWAIT to avoid blocking the caller.
724    /// - On backpressure (EAGAIN), the message is dropped (latest-only semantics).
725    pub fn send_sensory_bytes(&self, bytes: Vec<u8>) -> Result<()> {
726        let _ = self.try_send_sensory_bytes(&bytes)?;
727        Ok(())
728    }
729
730    /// Try sending pre-serialized sensory bytes to FEAGI (non-blocking), returning whether it was sent.
731    ///
732    /// Returns:
733    /// - `Ok(true)` if the message was sent.
734    /// - `Ok(false)` if dropped due to backpressure (EAGAIN).
735    /// - `Err(...)` for other failures (not registered, socket errors).
736    pub fn try_send_sensory_bytes(&self, bytes: &[u8]) -> Result<bool> {
737        if !self.registered {
738            return Err(SdkError::NotRegistered);
739        }
740
741        let socket = self
742            .sensory_socket
743            .as_ref()
744            .ok_or_else(|| SdkError::Other("Sensory socket not initialized".to_string()))?;
745
746        match socket.send(bytes, zmq::DONTWAIT) {
747            Ok(()) => {
748                debug!("[CLIENT] Sent {} bytes sensory (raw)", bytes.len());
749                Ok(true)
750            }
751            Err(zmq::Error::EAGAIN) => {
752                // REAL-TIME: Drop on pressure (do not block and do not buffer history)
753                static DROPPED: AtomicU64 = AtomicU64::new(0);
754                static LAST_LOG_MS: AtomicU64 = AtomicU64::new(0);
755
756                let dropped = DROPPED.fetch_add(1, Ordering::Relaxed) + 1;
757                let now_ms = SystemTime::now()
758                    .duration_since(SystemTime::UNIX_EPOCH)
759                    .unwrap_or_default()
760                    .as_millis() as u64;
761
762                let last_ms = LAST_LOG_MS.load(Ordering::Relaxed);
763                // Rate-limit warnings (max once per 5s) to avoid log spam on sustained pressure.
764                if now_ms.saturating_sub(last_ms) >= 5_000
765                    && LAST_LOG_MS
766                        .compare_exchange(last_ms, now_ms, Ordering::Relaxed, Ordering::Relaxed)
767                        .is_ok()
768                {
769                    warn!(
770                        "[CLIENT] Sensory backpressure: dropped_messages={} last_payload_bytes={}",
771                        dropped,
772                        bytes.len()
773                    );
774                }
775
776                Ok(false)
777            }
778            Err(e) => Err(SdkError::Zmq(e)),
779        }
780    }
781
782    /// Receive motor data from FEAGI (non-blocking)
783    ///
784    /// Returns None if no data is available.
785    /// Motor data is in binary XYZP format (CorticalMappedXYZPNeuronVoxels).
786    ///
787    /// # Example
788    /// ```ignore
789    /// use feagi_structures::neuron_voxels::xyzp::CorticalMappedXYZPNeuronVoxels;
790    ///
791    /// if let Some(motor_data) = client.receive_motor_data()? {
792    ///     // Process binary motor data
793    ///     for (cortical_id, neurons) in motor_data.iter() {
794    ///         println!("Motor area {:?}: {} neurons", cortical_id, neurons.len());
795    ///     }
796    /// }
797    /// ```
798    pub fn receive_motor_data(
799        &self,
800    ) -> Result<Option<feagi_structures::neuron_voxels::xyzp::CorticalMappedXYZPNeuronVoxels>> {
801        use feagi_structures::neuron_voxels::xyzp::CorticalMappedXYZPNeuronVoxels;
802
803        if !self.registered {
804            return Err(SdkError::NotRegistered);
805        }
806
807        let socket = self.motor_socket.as_ref().ok_or_else(|| {
808            error!("[CLIENT] receive_motor_data() called but motor_socket is None");
809            SdkError::Other("Motor socket not initialized (not a motor agent?)".to_string())
810        })?;
811
812        // Non-blocking receive:
813        // - preferred multipart: [topic, data]
814        // - legacy single-part: [data]
815        match socket.recv_bytes(zmq::DONTWAIT) {
816            Ok(first_frame) => {
817                trace!(
818                    "[CLIENT] Received first motor frame: {} bytes",
819                    first_frame.len()
820                );
821
822                // Check if more frames are available (multipart)
823                let (_topic_opt, data) = if socket.get_rcvmore().map_err(SdkError::Zmq)? {
824                    // First frame is the topic, second frame is the motor data
825                    let topic = first_frame;
826                    trace!(
827                        "[CLIENT] Motor multipart topic: '{}'",
828                        String::from_utf8_lossy(&topic)
829                    );
830                    trace!("[CLIENT] Receiving second frame (motor data)");
831                    let data = socket.recv_bytes(0).map_err(|e| {
832                        error!("[CLIENT] Failed to receive second frame: {}", e);
833                        SdkError::Zmq(e)
834                    })?;
835                    trace!("[CLIENT] Received motor data frame: {} bytes", data.len());
836                    (Some(topic), data)
837                } else {
838                    // Legacy single-part format: treat first frame as data
839                    (None, first_frame)
840                };
841
842                // Do not filter by topic here.
843                //
844                // FEAGI publishers have historically used different topic conventions
845                // (agent_id, empty topic, or other routing keys). Since we subscribe to all topics,
846                // the safest approach is to accept the motor payload regardless of topic and let
847                // higher layers decide what to do with it.
848
849                // ARCHITECTURE COMPLIANCE: Deserialize binary XYZP motor data using FeagiByteContainer
850                let mut byte_container = feagi_serialization::FeagiByteContainer::new_empty();
851                let mut data_vec = data.to_vec();
852
853                // Load bytes into container
854                byte_container
855                    .try_write_data_to_container_and_verify(&mut |bytes| {
856                        std::mem::swap(bytes, &mut data_vec);
857                        Ok(())
858                    })
859                    .map_err(|e| {
860                        SdkError::Other(format!("Failed to load motor data bytes: {:?}", e))
861                    })?;
862
863                // Get number of structures (should be 1 for motor data)
864                let num_structures = byte_container
865                    .try_get_number_contained_structures()
866                    .map_err(|e| {
867                        SdkError::Other(format!("Failed to get structure count: {:?}", e))
868                    })?;
869
870                if num_structures == 0 {
871                    return Ok(None);
872                }
873
874                // Extract first structure
875                let boxed_struct =
876                    byte_container
877                        .try_create_new_struct_from_index(0)
878                        .map_err(|e| {
879                            SdkError::Other(format!("Failed to extract motor structure: {:?}", e))
880                        })?;
881
882                // Downcast to CorticalMappedXYZPNeuronVoxels
883                let motor_data = boxed_struct
884                    .as_any()
885                    .downcast_ref::<CorticalMappedXYZPNeuronVoxels>()
886                    .ok_or_else(|| {
887                        SdkError::Other(
888                            "Motor data is not CorticalMappedXYZPNeuronVoxels".to_string(),
889                        )
890                    })?
891                    .clone();
892
893                debug!(
894                    "[CLIENT] ✓ Received motor data ({} bytes, {} areas)",
895                    data.len(),
896                    motor_data.len()
897                );
898                Ok(Some(motor_data))
899            }
900            Err(zmq::Error::EAGAIN) => {
901                // No data available (FEAGI not publishing OR slow joiner syndrome)
902                Ok(None)
903            }
904            Err(e) => {
905                error!("[CLIENT] ❌ ZMQ error on motor receive: {}", e);
906                Err(SdkError::Zmq(e))
907            }
908        }
909    }
910
911    /// Receive visualization data from FEAGI (non-blocking)
912    ///
913    /// Returns None if no data is available.
914    ///
915    /// # Example
916    /// ```ignore
917    /// if let Some(viz_data) = client.receive_visualization_data()? {
918    ///     // Process neural activity data
919    ///     println!("Visualization data size: {} bytes", viz_data.len());
920    /// }
921    /// ```
922    pub fn receive_visualization_data(&self) -> Result<Option<Vec<u8>>> {
923        if !self.registered {
924            return Err(SdkError::NotRegistered);
925        }
926
927        let socket = self.viz_socket.as_ref().ok_or_else(|| {
928            SdkError::Other(
929                "Visualization socket not initialized (not a visualization/infrastructure agent?)"
930                    .to_string(),
931            )
932        })?;
933
934        // Non-blocking receive
935        match socket.recv_bytes(zmq::DONTWAIT) {
936            Ok(data) => {
937                debug!(
938                    "[CLIENT] ✓ Received visualization data ({} bytes)",
939                    data.len()
940                );
941                Ok(Some(data))
942            }
943            Err(zmq::Error::EAGAIN) => Ok(None), // No data available
944            Err(e) => Err(SdkError::Zmq(e)),
945        }
946    }
947
948    /// Make a REST API request to FEAGI over ZMQ
949    ///
950    /// # Arguments
951    /// * `method` - HTTP method (GET, POST, PUT, DELETE)
952    /// * `route` - API route (e.g., "/v1/system/health_check")
953    /// * `data` - Optional request body for POST/PUT requests
954    ///
955    /// # Example
956    /// ```ignore
957    /// // GET request
958    /// let health = client.control_request("GET", "/v1/system/health_check", None)?;
959    ///
960    /// // POST request
961    /// let data = serde_json::json!({"key": "value"});
962    /// let response = client.control_request("POST", "/v1/some/endpoint", Some(data))?;
963    /// ```
964    pub fn control_request(
965        &self,
966        method: &str,
967        route: &str,
968        data: Option<serde_json::Value>,
969    ) -> Result<serde_json::Value> {
970        if !self.registered {
971            return Err(SdkError::NotRegistered);
972        }
973
974        let socket = self.control_socket.as_ref().ok_or_else(|| {
975            SdkError::Other(
976                "Control socket not initialized (not an infrastructure agent?)".to_string(),
977            )
978        })?;
979
980        // Prepare REST-over-ZMQ request
981        let mut request = serde_json::json!({
982            "method": method,
983            "route": route,
984            "headers": {"content-type": "application/json"},
985        });
986
987        if let Some(body) = data {
988            request["body"] = body;
989        }
990
991        // Send request
992        socket.send(request.to_string().as_bytes(), 0)?;
993
994        // Wait for response
995        let response_bytes = socket.recv_bytes(0)?;
996        let response: serde_json::Value = serde_json::from_slice(&response_bytes)?;
997
998        debug!("[CLIENT] ✓ Control request {} {} completed", method, route);
999        Ok(response)
1000    }
1001
1002    /// Check if agent is registered
1003    pub fn is_registered(&self) -> bool {
1004        self.registered
1005    }
1006
1007    /// Get agent ID
1008    pub fn agent_id(&self) -> &str {
1009        &self.config.agent_id
1010    }
1011}
1012
1013impl Drop for AgentClient {
1014    fn drop(&mut self) {
1015        debug!("[CLIENT] Dropping AgentClient: {}", self.config.agent_id);
1016
1017        // Step 1: Stop heartbeat service first (this stops background threads)
1018        if let Some(mut heartbeat) = self.heartbeat.take() {
1019            debug!("[CLIENT] Stopping heartbeat service before cleanup");
1020            heartbeat.stop();
1021            debug!("[CLIENT] Heartbeat service stopped");
1022        }
1023
1024        // Step 2: Deregister from FEAGI (after threads stopped)
1025        if self.registered {
1026            debug!("[CLIENT] Deregistering agent: {}", self.config.agent_id);
1027            if let Err(e) = self.deregister() {
1028                warn!("[CLIENT] Deregistration failed during drop: {}", e);
1029                // Continue cleanup even if deregistration fails
1030            }
1031        }
1032
1033        // Step 3: Sockets will be dropped automatically
1034        debug!(
1035            "[CLIENT] AgentClient dropped cleanly: {}",
1036            self.config.agent_id
1037        );
1038    }
1039}
1040
1041#[cfg(test)]
1042mod tests {
1043    use super::*;
1044    use feagi_io::AgentType;
1045
1046    #[test]
1047    fn test_client_creation() {
1048        let config = AgentConfig::new("test_agent", AgentType::Sensory)
1049            .with_vision_capability("camera", (640, 480), 3, "i_vision")
1050            .with_registration_endpoint("tcp://localhost:8000")
1051            .with_sensory_endpoint("tcp://localhost:5558");
1052
1053        let client = AgentClient::new(config);
1054        assert!(client.is_ok());
1055
1056        let client = client.unwrap();
1057        assert!(!client.is_registered());
1058        assert_eq!(client.agent_id(), "test_agent");
1059    }
1060
1061    // Note: Full integration tests require a running FEAGI instance
1062    // and should be in separate integration test files
1063}