Skip to main content

feagi_agent/core/
client.rs

1// Copyright 2025 Neuraville Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! FEAGI Agent Client implementation
5
6use crate::core::config::AgentConfig;
7use crate::core::error::{Result, SdkError};
8use crate::core::heartbeat::HeartbeatService;
9use crate::core::reconnect::{retry_with_backoff, ReconnectionStrategy};
10use feagi_io::AgentType;
11use futures::FutureExt;
12use std::future::Future;
13use std::net::{TcpStream, ToSocketAddrs};
14use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
15use std::sync::{Arc, Mutex};
16use std::time::Duration;
17use std::time::SystemTime;
18use tokio::runtime::Handle;
19use tokio::runtime::Runtime;
20use tokio::task::block_in_place;
21use tokio::time::timeout;
22use tracing::{debug, error, info, trace, warn};
23use zeromq::{
24    DealerSocket, PushSocket, Socket, SocketRecv, SocketSend, SubSocket, ZmqError, ZmqMessage,
25};
26
27fn block_on_with<T>(
28    handle: &Handle,
29    runtime: Option<&Runtime>,
30    future: impl Future<Output = T>,
31) -> T {
32    if Handle::try_current().is_ok() {
33        block_in_place(|| handle.block_on(future))
34    } else if let Some(runtime) = runtime {
35        runtime.block_on(future)
36    } else {
37        handle.block_on(future)
38    }
39}
40
41/// Main FEAGI Agent Client
42///
43/// This client handles:
44/// - Registration with FEAGI
45/// - Automatic heartbeat
46/// - Sending sensory data
47/// - Receiving motor data (for motor agents)
48/// - Automatic deregistration on drop
49///
50/// # Example
51/// ```ignore
52/// use feagi_agent::{AgentClient, AgentConfig, AgentType};
53///
54/// let config = AgentConfig::new("my_camera", AgentType::Sensory)
55///     .with_feagi_host("localhost")
56///     .with_vision_capability("camera", (640, 480), 3, "i_vision");
57///
58/// let mut client = AgentClient::new(config)?;
59/// client.connect()?;
60///
61/// // Send sensory data
62/// client.send_sensory_data(vec![(0, 50.0), (1, 75.0)])?;
63///
64/// // Client auto-deregisters on drop
65/// ```
66pub struct AgentClient {
67    /// Configuration
68    config: AgentConfig,
69
70    /// Tokio runtime handle (ambient if available)
71    runtime_handle: Handle,
72
73    /// Owned runtime when created outside tokio
74    runtime: Option<Arc<Runtime>>,
75
76    /// Registration socket (ZMQ DEALER - shared with heartbeat)
77    registration_socket: Option<Arc<Mutex<DealerSocket>>>,
78
79    /// Sensory data socket (ZMQ PUSH)
80    sensory_socket: Option<Arc<Mutex<PushSocket>>>,
81
82    /// Motor data socket (ZMQ SUB)
83    motor_socket: Option<Arc<Mutex<SubSocket>>>,
84
85    /// Visualization stream socket (ZMQ SUB)
86    viz_socket: Option<Arc<Mutex<SubSocket>>>,
87
88    /// Control/API socket (ZMQ DEALER - REST over ZMQ)
89    control_socket: Option<Arc<Mutex<DealerSocket>>>,
90
91    /// Heartbeat service
92    heartbeat: Option<HeartbeatService>,
93
94    /// Registration state
95    registered: bool,
96
97    /// Last reconnect attempt timestamp (ms since UNIX epoch)
98    last_reconnect_attempt_ms: AtomicU64,
99
100    /// Sensory socket has completed at least one successful send
101    sensory_connected: AtomicBool,
102
103    /// Last successful registration response body (JSON) returned by FEAGI.
104    ///
105    /// FEAGI registration is performed via "REST over ZMQ" and returns a wrapper:
106    /// `{ "status": 200, "body": { ... } }`. This field stores the `body` object.
107    ///
108    /// @cursor:ffi-safe - this is used by language bindings (Java JNI) to avoid
109    /// re-implementing FEAGI-specific response parsing in non-Rust SDKs.
110    last_registration_body: Option<serde_json::Value>,
111}
112
113impl AgentClient {
114    /// Create a new FEAGI agent client
115    ///
116    /// # Arguments
117    /// * `config` - Agent configuration
118    ///
119    pub fn new(config: AgentConfig) -> Result<Self> {
120        // Validate configuration
121        config.validate()?;
122
123        let (runtime_handle, runtime) = if let Ok(handle) = Handle::try_current() {
124            (handle, None)
125        } else {
126            let runtime = Runtime::new()
127                .map_err(|e| SdkError::Other(format!("Failed to create runtime: {}", e)))?;
128            let handle = runtime.handle().clone();
129            (handle, Some(Arc::new(runtime)))
130        };
131
132        Ok(Self {
133            config,
134            runtime_handle,
135            runtime,
136            registration_socket: None,
137            sensory_socket: None,
138            motor_socket: None,
139            viz_socket: None,
140            control_socket: None,
141            heartbeat: None,
142            registered: false,
143            last_registration_body: None,
144            last_reconnect_attempt_ms: AtomicU64::new(0),
145            sensory_connected: AtomicBool::new(false),
146        })
147    }
148
149    /// Get the last successful registration response body (JSON), if available.
150    ///
151    /// This is only set after a successful `connect()` / registration step.
152    pub fn registration_body_json(&self) -> Option<&serde_json::Value> {
153        self.last_registration_body.as_ref()
154    }
155
156    /// Connect to FEAGI and register the agent
157    ///
158    /// This will:
159    /// 1. Create ZMQ control sockets (registration/control)
160    /// 2. Register with FEAGI
161    /// 3. Create ZMQ data sockets (sensory/motor/viz)
162    /// 4. Start heartbeat service (ONLY after successful registration)
163    ///
164    /// IMPORTANT: Background threads are ONLY started AFTER successful registration.
165    /// This prevents thread interference with GUI event loops (e.g., MuJoCo, Godot).
166    /// If registration fails, NO threads are spawned.
167    ///
168    /// TIMING: FEAGI starts its ZMQ data stream services asynchronously after processing
169    /// the registration message. Data socket connection uses retry/backoff to handle
170    /// stream readiness delays.
171    pub fn connect(&mut self) -> Result<()> {
172        if self.registered {
173            return Err(SdkError::AlreadyConnected);
174        }
175
176        info!(
177            "[CLIENT] Step 0: starting connection sequence to FEAGI: {}",
178            self.config.registration_endpoint
179        );
180
181        // Step 1: Create control sockets with retry (registration/control only)
182        info!("[CLIENT] Step 1: creating control sockets (registration)...");
183        let mut socket_strategy = ReconnectionStrategy::new(
184            self.config.retry_backoff_ms,
185            self.config.registration_retries,
186        );
187        retry_with_backoff(
188            || self.create_sockets(),
189            &mut socket_strategy,
190            "Socket creation",
191        )?;
192        info!("[CLIENT] Step 1: control sockets created");
193
194        // Step 2: Register with FEAGI with retry
195        info!("[CLIENT] Step 2: registering with FEAGI...");
196        let mut reg_strategy = ReconnectionStrategy::new(
197            self.config.retry_backoff_ms,
198            self.config.registration_retries,
199        );
200        retry_with_backoff(|| self.register(), &mut reg_strategy, "Registration")?;
201        info!("[CLIENT] Step 2: registration successful");
202
203        // Step 3: Create data sockets with retry (sensory/motor/viz)
204        //
205        // Retry strategy: FEAGI may start streams asynchronously, so we retry with backoff.
206        info!("[CLIENT] Step 3: connecting to data streams with retry...");
207        info!(
208            "[CLIENT]   sensory endpoint: {}",
209            self.config.sensory_endpoint
210        );
211        if matches!(self.config.agent_type, AgentType::Motor | AgentType::Both) {
212            info!("[CLIENT]   motor endpoint: {}", self.config.motor_endpoint);
213        }
214        let mut data_socket_strategy = ReconnectionStrategy::new(
215            self.config.retry_backoff_ms,
216            self.config.registration_retries,
217        );
218        retry_with_backoff(
219            || self.connect_data_sockets(),
220            &mut data_socket_strategy,
221            "Data socket creation",
222        )?;
223        info!("[CLIENT] Step 3: data sockets connected");
224
225        // Step 4: Start heartbeat service (ONLY after successful registration)
226        info!("[CLIENT] Step 4: starting heartbeat service...");
227        if self.config.heartbeat_interval > 0.0 {
228            self.start_heartbeat()?;
229            info!("[CLIENT] Step 4: heartbeat service started");
230        } else {
231            info!("[CLIENT] Step 4: heartbeat disabled (interval = 0)");
232        }
233
234        info!(
235            "[CLIENT] fully connected to FEAGI as agent: {}",
236            self.config.agent_id
237        );
238        Ok(())
239    }
240
241    fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
242        block_on_with(&self.runtime_handle, self.runtime.as_deref(), future)
243    }
244
245    /// Create ZMQ sockets
246    fn create_sockets(&mut self) -> Result<()> {
247        // Registration socket (DEALER - for registration and heartbeat)
248        let mut reg_socket = DealerSocket::new();
249        let reg_endpoint = self.config.registration_endpoint.clone();
250        self.block_on(reg_socket.connect(&reg_endpoint))
251            .map_err(|e: ZmqError| {
252                SdkError::Other(format!("registration endpoint {}: {}", reg_endpoint, e))
253            })?;
254        self.registration_socket = Some(Arc::new(Mutex::new(reg_socket)));
255
256        // Control socket (REQ - for REST API requests over ZMQ)
257        if matches!(self.config.agent_type, AgentType::Infrastructure) {
258            let mut control_socket = DealerSocket::new();
259            self.block_on(control_socket.connect(&self.config.control_endpoint))
260                .map_err(SdkError::Zmq)?;
261            self.control_socket = Some(Arc::new(Mutex::new(control_socket)));
262            debug!("[CLIENT] ✓ Control/API socket created");
263        }
264
265        debug!("[CLIENT] ✓ ZMQ control sockets created");
266        Ok(())
267    }
268
269    /// Connect data sockets (sensory/motor/viz) after registration
270    fn connect_data_sockets(&mut self) -> Result<()> {
271        // Sensory socket (PUSH - for sending data to FEAGI)
272        self.sensory_connected.store(false, Ordering::Relaxed);
273        self.wait_for_tcp_endpoint("sensory", &self.config.sensory_endpoint)?;
274        let mut sensory_socket = PushSocket::new();
275        let sensory_endpoint = self.config.sensory_endpoint.clone();
276        self.block_on(sensory_socket.connect(&sensory_endpoint))
277            .map_err(|e: ZmqError| {
278                SdkError::Other(format!("sensory endpoint {}: {}", sensory_endpoint, e))
279            })?;
280        self.sensory_socket = Some(Arc::new(Mutex::new(sensory_socket)));
281
282        // Motor socket (SUB - for receiving motor commands from FEAGI)
283        if matches!(self.config.agent_type, AgentType::Motor | AgentType::Both) {
284            info!(
285                "[SDK-CONNECT] 🎮 Initializing motor socket for agent '{}' (type: {:?})",
286                self.config.agent_id, self.config.agent_type
287            );
288            info!(
289                "[SDK-CONNECT] 🎮 Motor endpoint: {}",
290                self.config.motor_endpoint
291            );
292
293            self.wait_for_tcp_endpoint("motor", &self.config.motor_endpoint)?;
294            let mut motor_socket = SubSocket::new();
295            let motor_endpoint = self.config.motor_endpoint.clone();
296            self.block_on(motor_socket.connect(&motor_endpoint))
297                .map_err(|e: ZmqError| {
298                    SdkError::Other(format!("motor endpoint {}: {}", motor_endpoint, e))
299                })?;
300            info!("[SDK-CONNECT] ✅ Motor socket connected");
301
302            // Subscribe to all motor messages.
303            //
304            // FEAGI motor PUB may publish either:
305            // - multipart [agent_id, data] (preferred), or
306            // - single-frame [data] (legacy).
307            //
308            // Subscribing only to agent_id would miss the legacy single-frame format entirely,
309            // and also breaks if the publisher uses an empty topic. We subscribe to all, then
310            // filter by topic in receive_motor_data().
311            info!("[SDK-CONNECT] 🎮 Subscribing to all motor topics");
312            self.block_on(motor_socket.subscribe(""))
313                .map_err(SdkError::Zmq)?;
314            info!("[SDK-CONNECT] ✅ Motor subscription set (all topics)");
315
316            self.motor_socket = Some(Arc::new(Mutex::new(motor_socket)));
317            info!("[SDK-CONNECT] ✅ Motor socket initialized successfully");
318        } else {
319            info!(
320                "[SDK-CONNECT] ⚠️ Motor socket NOT initialized (agent type: {:?})",
321                self.config.agent_type
322            );
323        }
324
325        // Visualization socket (SUB - for receiving neural activity stream from FEAGI)
326        if matches!(
327            self.config.agent_type,
328            AgentType::Visualization | AgentType::Infrastructure
329        ) {
330            self.wait_for_tcp_endpoint("visualization", &self.config.visualization_endpoint)?;
331            let mut viz_socket = SubSocket::new();
332            let viz_endpoint = self.config.visualization_endpoint.clone();
333            self.block_on(viz_socket.connect(&viz_endpoint))
334                .map_err(|e: ZmqError| {
335                    SdkError::Other(format!("visualization endpoint {}: {}", viz_endpoint, e))
336                })?;
337
338            // Subscribe to all visualization messages
339            self.block_on(viz_socket.subscribe(""))
340                .map_err(SdkError::Zmq)?;
341            self.viz_socket = Some(Arc::new(Mutex::new(viz_socket)));
342            debug!("[CLIENT] ✓ Visualization socket created");
343        }
344
345        debug!("[CLIENT] ✓ ZMQ data sockets created");
346        Ok(())
347    }
348
349    /// Reconnect data sockets (sensory/motor/viz) using configured retry/backoff.
350    pub fn reconnect_data_streams(&mut self) -> Result<()> {
351        if !self.registered {
352            return Err(SdkError::NotRegistered);
353        }
354
355        let now_ms = SystemTime::now()
356            .duration_since(SystemTime::UNIX_EPOCH)
357            .unwrap_or_default()
358            .as_millis() as u64;
359        let last_ms = self.last_reconnect_attempt_ms.load(Ordering::Relaxed);
360        let min_interval_ms = self.config.retry_backoff_ms.max(1);
361        if now_ms.saturating_sub(last_ms) < min_interval_ms {
362            return Ok(());
363        }
364        self.last_reconnect_attempt_ms
365            .store(now_ms, Ordering::Relaxed);
366
367        self.close_data_sockets();
368
369        let mut data_socket_strategy = ReconnectionStrategy::new(
370            self.config.retry_backoff_ms,
371            self.config.registration_retries,
372        );
373        retry_with_backoff(
374            || self.connect_data_sockets(),
375            &mut data_socket_strategy,
376            "Data socket reconnection",
377        )
378    }
379
380    fn close_data_sockets(&mut self) {
381        self.sensory_connected.store(false, Ordering::Relaxed);
382        if let Some(socket) = self.sensory_socket.take() {
383            if let Ok(socket) = Arc::try_unwrap(socket) {
384                match socket.into_inner() {
385                    Ok(socket) => {
386                        let _ = self.block_on(socket.close());
387                    }
388                    Err(poisoned) => {
389                        let _ = self.block_on(poisoned.into_inner().close());
390                    }
391                }
392            }
393        }
394        if let Some(socket) = self.motor_socket.take() {
395            if let Ok(socket) = Arc::try_unwrap(socket) {
396                match socket.into_inner() {
397                    Ok(socket) => {
398                        let _ = self.block_on(socket.close());
399                    }
400                    Err(poisoned) => {
401                        let _ = self.block_on(poisoned.into_inner().close());
402                    }
403                }
404            }
405        }
406        if let Some(socket) = self.viz_socket.take() {
407            if let Ok(socket) = Arc::try_unwrap(socket) {
408                match socket.into_inner() {
409                    Ok(socket) => {
410                        let _ = self.block_on(socket.close());
411                    }
412                    Err(poisoned) => {
413                        let _ = self.block_on(poisoned.into_inner().close());
414                    }
415                }
416            }
417        }
418    }
419
420    fn wait_for_tcp_endpoint(&self, label: &str, endpoint: &str) -> Result<()> {
421        let address = endpoint
422            .strip_prefix("tcp://")
423            .ok_or_else(|| {
424                SdkError::InvalidConfig(format!("Invalid {} endpoint: {}", label, endpoint))
425            })?
426            .to_string();
427        let timeout = Duration::from_millis(self.config.connection_timeout_ms);
428
429        info!("[CLIENT] checking {} endpoint: {}", label, endpoint);
430        let mut strategy = ReconnectionStrategy::new(
431            self.config.retry_backoff_ms,
432            self.config.registration_retries,
433        );
434        retry_with_backoff(
435            || {
436                let mut addrs = address.to_socket_addrs().map_err(|e| {
437                    SdkError::InvalidConfig(format!(
438                        "Failed to resolve {} endpoint {}: {}",
439                        label, endpoint, e
440                    ))
441                })?;
442                let addr = addrs.next().ok_or_else(|| {
443                    SdkError::InvalidConfig(format!(
444                        "No resolved addresses for {} endpoint {}",
445                        label, endpoint
446                    ))
447                })?;
448                TcpStream::connect_timeout(&addr, timeout).map_err(|e| {
449                    SdkError::Timeout(format!(
450                        "{} endpoint {} not reachable: {}",
451                        label, endpoint, e
452                    ))
453                })?;
454                info!("[CLIENT] ✅ {} endpoint is reachable", label);
455                Ok(())
456            },
457            &mut strategy,
458            &format!("{} endpoint availability", label),
459        )
460    }
461
462    fn send_request(
463        &self,
464        socket: &Arc<Mutex<DealerSocket>>,
465        request: &serde_json::Value,
466    ) -> Result<serde_json::Value> {
467        let mut socket = socket
468            .lock()
469            .map_err(|e| SdkError::ThreadError(format!("Failed to lock socket: {}", e)))?;
470
471        let message = ZmqMessage::from(request.to_string().into_bytes());
472        self.block_on(socket.send(message)).map_err(SdkError::Zmq)?;
473
474        let timeout_ms = self.config.connection_timeout_ms;
475        let recv_result = if timeout_ms > 0 {
476            self.block_on(async {
477                timeout(std::time::Duration::from_millis(timeout_ms), socket.recv()).await
478            })
479            .map_err(|_| SdkError::Timeout(format!("Request timed out after {}ms", timeout_ms)))?
480            .map_err(SdkError::Zmq)?
481        } else {
482            self.block_on(socket.recv()).map_err(SdkError::Zmq)?
483        };
484
485        let frames = recv_result.into_vec();
486        let payload = frames
487            .last()
488            .ok_or_else(|| SdkError::Other("Response was empty".to_string()))?;
489        let response: serde_json::Value = serde_json::from_slice(payload)?;
490        Ok(response)
491    }
492
493    /// Register with FEAGI
494    fn register(&mut self) -> Result<()> {
495        let registration_msg = serde_json::json!({
496            "method": "POST",
497            "path": "/v1/agent/register",
498            "body": {
499                "agent_id": self.config.agent_id,
500                "agent_type": match self.config.agent_type {
501                    AgentType::Sensory => "sensory",
502                    AgentType::Motor => "motor",
503                    AgentType::Both => "both",
504                    AgentType::Visualization => "visualization",
505                    AgentType::Infrastructure => "infrastructure",
506                },
507                "capabilities": self.config.capabilities,
508            }
509        });
510
511        let socket = self
512            .registration_socket
513            .as_ref()
514            .ok_or_else(|| SdkError::Other("Registration socket not initialized".to_string()))?;
515
516        debug!(
517            "[CLIENT] Sending registration request for: {}",
518            self.config.agent_id
519        );
520        let response = self.send_request(socket, &registration_msg)?;
521
522        // Check response status (REST format: {"status": 200, "body": {...}})
523        let status_code = response
524            .get("status")
525            .and_then(|s| s.as_u64())
526            .unwrap_or(500);
527        if status_code == 200 {
528            self.registered = true;
529            // Capture the `body` for downstream consumers (FFI bindings).
530            let empty_body = serde_json::json!({});
531            let body = response.get("body").unwrap_or(&empty_body);
532            self.last_registration_body = Some(body.clone());
533            info!("[CLIENT] ✓ Registration successful: {:?}", response);
534            Ok(())
535        } else {
536            let empty_body = serde_json::json!({});
537            let body = response.get("body").unwrap_or(&empty_body);
538            let message = body
539                .get("error")
540                .and_then(|m| m.as_str())
541                .unwrap_or("Unknown error");
542            // Clear any previously cached registration body on failure.
543            self.last_registration_body = None;
544
545            // Check if already registered - try deregistration and retry
546            if message.contains("already registered") {
547                warn!("[CLIENT] ⚠ Agent already registered - attempting deregistration and retry");
548                self.deregister()?;
549
550                // Retry registration after deregistration
551                info!("[CLIENT] Retrying registration after deregistration...");
552                std::thread::sleep(std::time::Duration::from_millis(100)); // Brief delay
553
554                // Recursive retry (only once - avoid infinite loop)
555                self.register_with_retry_once()
556            } else {
557                error!("[CLIENT] ✗ Registration failed: {}", message);
558                Err(SdkError::RegistrationFailed(message.to_string()))
559            }
560        }
561    }
562
563    /// Register with FEAGI (with automatic retry after deregistration)
564    fn register_with_retry_once(&mut self) -> Result<()> {
565        let registration_msg = serde_json::json!({
566            "method": "POST",
567            "path": "/v1/agent/register",
568            "body": {
569                "agent_id": self.config.agent_id,
570                "agent_type": match self.config.agent_type {
571                    AgentType::Sensory => "sensory",
572                    AgentType::Motor => "motor",
573                    AgentType::Both => "both",
574                    AgentType::Visualization => "visualization",
575                    AgentType::Infrastructure => "infrastructure",
576                },
577                "capabilities": self.config.capabilities,
578            }
579        });
580
581        let socket = self
582            .registration_socket
583            .as_ref()
584            .ok_or_else(|| SdkError::Other("Registration socket not initialized".to_string()))?;
585
586        debug!(
587            "[CLIENT] Sending registration request (retry) for: {}",
588            self.config.agent_id
589        );
590        let response = self.send_request(socket, &registration_msg)?;
591
592        // Check response status
593        let status_code = response
594            .get("status")
595            .and_then(|s| s.as_u64())
596            .unwrap_or(500);
597        if status_code == 200 {
598            self.registered = true;
599            // Capture the `body` for downstream consumers (FFI bindings).
600            let empty_body = serde_json::json!({});
601            let body = response.get("body").unwrap_or(&empty_body);
602            self.last_registration_body = Some(body.clone());
603            info!(
604                "[CLIENT] ✓ Registration successful (after retry): {:?}",
605                response
606            );
607            Ok(())
608        } else {
609            let empty_body = serde_json::json!({});
610            let body = response.get("body").unwrap_or(&empty_body);
611            let message = body
612                .get("error")
613                .and_then(|m| m.as_str())
614                .unwrap_or("Unknown error");
615            self.last_registration_body = None;
616            error!("[CLIENT] ✗ Registration retry failed: {}", message);
617            Err(SdkError::RegistrationFailed(message.to_string()))
618        }
619    }
620
621    /// Deregister from FEAGI
622    fn deregister(&mut self) -> Result<()> {
623        if !self.registered && self.registration_socket.is_none() {
624            return Ok(()); // Nothing to deregister
625        }
626
627        info!("[CLIENT] Deregistering agent: {}", self.config.agent_id);
628
629        let deregistration_msg = serde_json::json!({
630            "method": "DELETE",
631            "path": "/v1/agent/deregister",
632            "body": {
633                "agent_id": self.config.agent_id,
634            }
635        });
636
637        if let Some(socket) = &self.registration_socket {
638            match self.send_request(socket, &deregistration_msg) {
639                Ok(response) => {
640                    if response.get("status").and_then(|s| s.as_u64()) == Some(200) {
641                        info!("[CLIENT] ✓ Deregistration successful");
642                    } else {
643                        warn!("[CLIENT] ⚠ Deregistration returned: {:?}", response);
644                    }
645                }
646                Err(e) => {
647                    warn!("[CLIENT] ⚠ Deregistration timeout/error: {}", e);
648                }
649            };
650        }
651
652        self.registered = false;
653        Ok(())
654    }
655
656    /// Start heartbeat service
657    fn start_heartbeat(&mut self) -> Result<()> {
658        if self.heartbeat.is_some() {
659            return Ok(());
660        }
661
662        let socket = self
663            .registration_socket
664            .as_ref()
665            .ok_or_else(|| SdkError::Other("Registration socket not initialized".to_string()))?;
666
667        let agent_type = match self.config.agent_type {
668            AgentType::Sensory => "sensory",
669            AgentType::Motor => "motor",
670            AgentType::Both => "both",
671            AgentType::Visualization => "visualization",
672            AgentType::Infrastructure => "infrastructure",
673        }
674        .to_string();
675        let capabilities = serde_json::to_value(&self.config.capabilities)
676            .map_err(|e| SdkError::Other(format!("Failed to serialize capabilities: {e}")))?;
677
678        let reconnect_spec = crate::core::heartbeat::ReconnectSpec {
679            agent_id: self.config.agent_id.clone(),
680            agent_type,
681            capabilities,
682            registration_retries: self.config.registration_retries,
683            retry_backoff_ms: self.config.retry_backoff_ms,
684        };
685
686        let mut heartbeat = HeartbeatService::new(
687            self.config.agent_id.clone(),
688            Arc::clone(socket),
689            self.runtime_handle.clone(),
690            self.runtime.clone(),
691            self.config.heartbeat_interval,
692        )
693        .with_reconnect_spec(reconnect_spec);
694
695        heartbeat.start()?;
696        self.heartbeat = Some(heartbeat);
697
698        debug!(
699            "[CLIENT] ✓ Heartbeat service started (interval: {}s)",
700            self.config.heartbeat_interval
701        );
702        Ok(())
703    }
704
705    /// Send sensory data to FEAGI
706    ///
707    /// # Arguments
708    /// * `neuron_pairs` - Vector of (neuron_id, potential) pairs
709    ///
710    /// # Example
711    /// ```ignore
712    /// client.send_sensory_data(vec![
713    ///     (0, 50.0),
714    ///     (1, 75.0),
715    ///     (2, 30.0),
716    /// ])?;
717    /// ```
718    pub fn send_sensory_data(&self, neuron_pairs: Vec<(i32, f64)>) -> Result<()> {
719        if !self.registered {
720            return Err(SdkError::NotRegistered);
721        }
722
723        let socket = self
724            .sensory_socket
725            .as_ref()
726            .ok_or_else(|| SdkError::Other("Sensory socket not initialized".to_string()))?;
727
728        // ARCHITECTURE COMPLIANCE: Use binary XYZP format, NOT JSON
729        // This serializes data using feagi_data_structures for cross-platform compatibility
730        use feagi_structures::genomic::cortical_area::CorticalID;
731        use feagi_structures::neuron_voxels::xyzp::{
732            CorticalMappedXYZPNeuronVoxels, NeuronVoxelXYZPArrays,
733        };
734
735        // Get cortical area and dimensions from vision capability
736        let vision_cap = self
737            .config
738            .capabilities
739            .vision
740            .as_ref()
741            .ok_or_else(|| SdkError::Other("No vision capability configured".to_string()))?;
742
743        let (width, _height) = vision_cap.dimensions;
744
745        // Derive cortical ID in a language-agnostic way if semantic unit+group is provided.
746        let cortical_id = if let (Some(unit), Some(group_index)) =
747            (vision_cap.unit, vision_cap.group)
748        {
749            use feagi_structures::genomic::cortical_area::io_cortical_area_configuration_flag::PercentageNeuronPositioning;
750            use feagi_structures::genomic::SensoryCorticalUnit;
751
752            let group: feagi_structures::genomic::cortical_area::descriptors::CorticalUnitIndex =
753                group_index.into();
754            let frame_change_handling = feagi_structures::genomic::cortical_area::io_cortical_area_configuration_flag::FrameChangeHandling::Absolute;
755            let percentage_neuron_positioning = PercentageNeuronPositioning::Linear;
756
757            let sensory_unit = match unit {
758                feagi_io::SensoryUnit::Infrared => SensoryCorticalUnit::Infrared,
759                feagi_io::SensoryUnit::Proximity => SensoryCorticalUnit::Proximity,
760                feagi_io::SensoryUnit::Shock => SensoryCorticalUnit::Shock,
761                feagi_io::SensoryUnit::Battery => SensoryCorticalUnit::Battery,
762                feagi_io::SensoryUnit::Servo => SensoryCorticalUnit::Servo,
763                feagi_io::SensoryUnit::AnalogGpio => SensoryCorticalUnit::AnalogGPIO,
764                feagi_io::SensoryUnit::DigitalGpio => SensoryCorticalUnit::DigitalGPIO,
765                feagi_io::SensoryUnit::MiscData => SensoryCorticalUnit::MiscData,
766                feagi_io::SensoryUnit::TextEnglishInput => SensoryCorticalUnit::TextEnglishInput,
767                feagi_io::SensoryUnit::CountInput => SensoryCorticalUnit::CountInput,
768                feagi_io::SensoryUnit::Vision => SensoryCorticalUnit::Vision,
769                feagi_io::SensoryUnit::SegmentedVision => SensoryCorticalUnit::SegmentedVision,
770                feagi_io::SensoryUnit::Accelerometer => SensoryCorticalUnit::Accelerometer,
771                feagi_io::SensoryUnit::Gyroscope => SensoryCorticalUnit::Gyroscope,
772            };
773
774            // Use the first sub-unit as the default send target for simple APIs.
775            // More advanced encoders should use the sensor cache mapping APIs instead.
776            match sensory_unit {
777                SensoryCorticalUnit::Infrared => {
778                    SensoryCorticalUnit::get_cortical_ids_array_for_infrared_with_parameters(
779                        frame_change_handling,
780                        percentage_neuron_positioning,
781                        group,
782                    )[0]
783                }
784                SensoryCorticalUnit::Proximity => {
785                    SensoryCorticalUnit::get_cortical_ids_array_for_proximity_with_parameters(
786                        frame_change_handling,
787                        percentage_neuron_positioning,
788                        group,
789                    )[0]
790                }
791                SensoryCorticalUnit::Shock => {
792                    SensoryCorticalUnit::get_cortical_ids_array_for_shock_with_parameters(
793                        frame_change_handling,
794                        percentage_neuron_positioning,
795                        group,
796                    )[0]
797                }
798                SensoryCorticalUnit::Battery => {
799                    SensoryCorticalUnit::get_cortical_ids_array_for_battery_with_parameters(
800                        frame_change_handling,
801                        percentage_neuron_positioning,
802                        group,
803                    )[0]
804                }
805                SensoryCorticalUnit::Servo => {
806                    SensoryCorticalUnit::get_cortical_ids_array_for_servo_with_parameters(
807                        frame_change_handling,
808                        percentage_neuron_positioning,
809                        group,
810                    )[0]
811                }
812                SensoryCorticalUnit::AnalogGPIO => {
813                    SensoryCorticalUnit::get_cortical_ids_array_for_analog_g_p_i_o_with_parameters(
814                        frame_change_handling,
815                        percentage_neuron_positioning,
816                        group,
817                    )[0]
818                }
819                SensoryCorticalUnit::DigitalGPIO => {
820                    SensoryCorticalUnit::get_cortical_ids_array_for_digital_g_p_i_o_with_parameters(group)[0]
821                }
822                SensoryCorticalUnit::MiscData => {
823                    SensoryCorticalUnit::get_cortical_ids_array_for_misc_data_with_parameters(
824                        frame_change_handling,
825                        group,
826                    )[0]
827                }
828                SensoryCorticalUnit::TextEnglishInput => {
829                    SensoryCorticalUnit::get_cortical_ids_array_for_text_english_input_with_parameters(
830                        frame_change_handling,
831                        group,
832                    )[0]
833                }
834                SensoryCorticalUnit::CountInput => {
835                    SensoryCorticalUnit::get_cortical_ids_array_for_count_input_with_parameters(
836                        frame_change_handling,
837                        percentage_neuron_positioning,
838                        group,
839                    )[0]
840                }
841                SensoryCorticalUnit::Vision => {
842                    SensoryCorticalUnit::get_cortical_ids_array_for_vision_with_parameters(
843                        frame_change_handling,
844                        group,
845                    )[0]
846                }
847                SensoryCorticalUnit::SegmentedVision => {
848                    SensoryCorticalUnit::get_cortical_ids_array_for_segmented_vision_with_parameters(
849                        frame_change_handling,
850                        group,
851                    )[0]
852                }
853                SensoryCorticalUnit::Accelerometer => {
854                    SensoryCorticalUnit::get_cortical_ids_array_for_accelerometer_with_parameters(
855                        frame_change_handling,
856                        percentage_neuron_positioning,
857                        group,
858                    )[0]
859                }
860                SensoryCorticalUnit::Gyroscope => {
861                    SensoryCorticalUnit::get_cortical_ids_array_for_gyroscope_with_parameters(
862                        frame_change_handling,
863                        percentage_neuron_positioning,
864                        group,
865                    )[0]
866                }
867            }
868        } else {
869            let cortical_area = &vision_cap.target_cortical_area;
870
871            // Legacy: Create CorticalID from area name
872            let mut bytes = [b' '; 8];
873            let name_bytes = cortical_area.as_bytes();
874            let copy_len = name_bytes.len().min(8);
875            bytes[..copy_len].copy_from_slice(&name_bytes[..copy_len]);
876            CorticalID::try_from_bytes(&bytes).map_err(|e| {
877                SdkError::Other(format!("Invalid cortical ID '{}': {:?}", cortical_area, e))
878            })?
879        };
880
881        // Convert flat neuron IDs to XYZP format
882        let mut x_coords = Vec::with_capacity(neuron_pairs.len());
883        let mut y_coords = Vec::with_capacity(neuron_pairs.len());
884        let mut z_coords = Vec::with_capacity(neuron_pairs.len());
885        let mut potentials = Vec::with_capacity(neuron_pairs.len());
886
887        for (neuron_id, potential) in neuron_pairs {
888            let neuron_id = neuron_id as u32;
889            x_coords.push(neuron_id % (width as u32));
890            y_coords.push(neuron_id / (width as u32));
891            z_coords.push(0); // Single channel grayscale
892            potentials.push(potential as f32);
893        }
894
895        let _neuron_count = x_coords.len(); // Reserved for future validation
896
897        // Create neuron arrays from vectors
898        let neuron_arrays =
899            NeuronVoxelXYZPArrays::new_from_vectors(x_coords, y_coords, z_coords, potentials)
900                .map_err(|e| SdkError::Other(format!("Failed to create neuron arrays: {:?}", e)))?;
901
902        // Create cortical mapped data
903        let cortical_id_log = cortical_id.as_base_64();
904        let mut cortical_mapped = CorticalMappedXYZPNeuronVoxels::new();
905        cortical_mapped.insert(cortical_id, neuron_arrays);
906
907        // Serialize to binary using FeagiByteContainer (version 2 container format)
908        let mut byte_container = feagi_serialization::FeagiByteContainer::new_empty();
909        byte_container
910            .overwrite_byte_data_with_single_struct_data(&cortical_mapped, 0)
911            .map_err(|e| SdkError::Other(format!("Failed to serialize to container: {:?}", e)))?;
912
913        let buffer = byte_container.get_byte_ref().to_vec();
914
915        // Send binary XYZP data (version 2 container format)
916        let mut socket = socket
917            .lock()
918            .map_err(|e| SdkError::ThreadError(format!("Failed to lock socket: {}", e)))?;
919        let message = ZmqMessage::from(buffer.clone());
920        self.block_on(socket.send(message)).map_err(SdkError::Zmq)?;
921
922        debug!(
923            "[CLIENT] Sent {} bytes XYZP binary to {}",
924            buffer.len(),
925            cortical_id_log
926        );
927        Ok(())
928    }
929
930    /// Send pre-serialized sensory bytes to FEAGI (real-time semantics).
931    ///
932    /// This is intended for high-performance clients (e.g., Python SDK brain_input)
933    /// that already produce FeagiByteContainer bytes via Rust-side encoding caches.
934    ///
935    /// Real-time policy:
936    /// - Uses ZMQ DONTWAIT to avoid blocking the caller.
937    /// - On backpressure (EAGAIN), the message is dropped (latest-only semantics).
938    pub fn send_sensory_bytes(&self, bytes: Vec<u8>) -> Result<()> {
939        let _ = self.try_send_sensory_bytes(&bytes)?;
940        Ok(())
941    }
942
943    /// Try sending pre-serialized sensory bytes to FEAGI (non-blocking), returning whether it was sent.
944    ///
945    /// Returns:
946    /// - `Ok(true)` if the message was sent.
947    /// - `Ok(false)` if dropped due to backpressure (EAGAIN).
948    /// - `Err(...)` for other failures (not registered, socket errors).
949    pub fn try_send_sensory_bytes(&self, bytes: &[u8]) -> Result<bool> {
950        if !self.registered {
951            return Err(SdkError::NotRegistered);
952        }
953
954        let socket = self
955            .sensory_socket
956            .as_ref()
957            .ok_or_else(|| SdkError::Other("Sensory socket not initialized".to_string()))?;
958
959        let mut socket = socket
960            .lock()
961            .map_err(|e| SdkError::ThreadError(format!("Failed to lock socket: {}", e)))?;
962        let message = ZmqMessage::from(bytes.to_vec());
963
964        if !self.sensory_connected.load(Ordering::Relaxed) {
965            let timeout_ms = self.config.connection_timeout_ms;
966            let send_result = self.block_on(async {
967                timeout(
968                    std::time::Duration::from_millis(timeout_ms),
969                    socket.send(message),
970                )
971                .await
972            });
973            match send_result {
974                Ok(Ok(())) => {
975                    self.sensory_connected.store(true, Ordering::Relaxed);
976                    debug!("[CLIENT] Sensory socket connected (first send)");
977                    return Ok(true);
978                }
979                Ok(Err(ZmqError::BufferFull(_))) => return Ok(false),
980                Ok(Err(e)) => {
981                    let message = e.to_string();
982                    if message.contains("Not connected to peers") {
983                        return Ok(false);
984                    }
985                    return Err(SdkError::Zmq(e));
986                }
987                Err(_) => return Ok(false),
988            }
989        }
990
991        let send_result = self.block_on(async { socket.send(message).now_or_never() });
992        match send_result {
993            Some(Ok(())) => {
994                debug!("[CLIENT] Sent {} bytes sensory (raw)", bytes.len());
995                Ok(true)
996            }
997            None | Some(Err(ZmqError::BufferFull(_))) => {
998                // REAL-TIME: Drop on pressure (do not block and do not buffer history)
999                static DROPPED: AtomicU64 = AtomicU64::new(0);
1000                static LAST_LOG_MS: AtomicU64 = AtomicU64::new(0);
1001
1002                let dropped = DROPPED.fetch_add(1, Ordering::Relaxed) + 1;
1003                let now_ms = SystemTime::now()
1004                    .duration_since(SystemTime::UNIX_EPOCH)
1005                    .unwrap_or_default()
1006                    .as_millis() as u64;
1007
1008                let last_ms = LAST_LOG_MS.load(Ordering::Relaxed);
1009                // Rate-limit warnings (max once per 5s) to avoid log spam on sustained pressure.
1010                if now_ms.saturating_sub(last_ms) >= 5_000
1011                    && LAST_LOG_MS
1012                        .compare_exchange(last_ms, now_ms, Ordering::Relaxed, Ordering::Relaxed)
1013                        .is_ok()
1014                {
1015                    warn!(
1016                        "[CLIENT] Sensory backpressure: dropped_messages={} last_payload_bytes={}",
1017                        dropped,
1018                        bytes.len()
1019                    );
1020                }
1021
1022                Ok(false)
1023            }
1024            Some(Err(e)) => {
1025                let message = e.to_string();
1026                if message.contains("Not connected to peers") {
1027                    static DROPPED: AtomicU64 = AtomicU64::new(0);
1028                    static LAST_LOG_MS: AtomicU64 = AtomicU64::new(0);
1029
1030                    let dropped = DROPPED.fetch_add(1, Ordering::Relaxed) + 1;
1031                    let now_ms = SystemTime::now()
1032                        .duration_since(SystemTime::UNIX_EPOCH)
1033                        .unwrap_or_default()
1034                        .as_millis() as u64;
1035
1036                    let last_ms = LAST_LOG_MS.load(Ordering::Relaxed);
1037                    if now_ms.saturating_sub(last_ms) >= 5_000
1038                        && LAST_LOG_MS
1039                            .compare_exchange(last_ms, now_ms, Ordering::Relaxed, Ordering::Relaxed)
1040                            .is_ok()
1041                    {
1042                        warn!(
1043                            "[CLIENT] Sensory not connected: dropped_messages={} last_payload_bytes={}",
1044                            dropped,
1045                            bytes.len()
1046                        );
1047                    }
1048
1049                    return Ok(false);
1050                }
1051
1052                Err(SdkError::Zmq(e))
1053            }
1054        }
1055    }
1056
1057    /// Receive motor data from FEAGI (non-blocking)
1058    ///
1059    /// Returns None if no data is available.
1060    /// Motor data is in binary XYZP format (CorticalMappedXYZPNeuronVoxels).
1061    ///
1062    /// # Example
1063    /// ```ignore
1064    /// use feagi_structures::neuron_voxels::xyzp::CorticalMappedXYZPNeuronVoxels;
1065    ///
1066    /// if let Some(motor_data) = client.receive_motor_data()? {
1067    ///     // Process binary motor data
1068    ///     for (cortical_id, neurons) in motor_data.iter() {
1069    ///         println!("Motor area {:?}: {} neurons", cortical_id, neurons.len());
1070    ///     }
1071    /// }
1072    /// ```
1073    pub fn receive_motor_data(
1074        &self,
1075    ) -> Result<Option<feagi_structures::neuron_voxels::xyzp::CorticalMappedXYZPNeuronVoxels>> {
1076        use feagi_structures::neuron_voxels::xyzp::CorticalMappedXYZPNeuronVoxels;
1077
1078        if !self.registered {
1079            return Err(SdkError::NotRegistered);
1080        }
1081
1082        let socket = self.motor_socket.as_ref().ok_or_else(|| {
1083            error!("[CLIENT] receive_motor_data() called but motor_socket is None");
1084            SdkError::Other("Motor socket not initialized (not a motor agent?)".to_string())
1085        })?;
1086
1087        let mut socket = socket
1088            .lock()
1089            .map_err(|e| SdkError::ThreadError(format!("Failed to lock socket: {}", e)))?;
1090        let recv_result = self.block_on(async { socket.recv().now_or_never() });
1091        let message = match recv_result {
1092            None => return Ok(None),
1093            Some(Ok(message)) => message,
1094            Some(Err(e)) => return Err(SdkError::Zmq(e)),
1095        };
1096
1097        // Non-blocking receive:
1098        // - preferred multipart: [topic, data]
1099        // - legacy single-part: [data]
1100        let mut frames = message.into_vec();
1101        if frames.is_empty() {
1102            return Ok(None);
1103        }
1104        let (_topic_opt, data) = if frames.len() == 2 {
1105            let topic = frames.remove(0).to_vec();
1106            trace!(
1107                "[CLIENT] Motor multipart topic: '{}'",
1108                String::from_utf8_lossy(&topic)
1109            );
1110            let data = frames.remove(0).to_vec();
1111            trace!("[CLIENT] Received motor data frame: {} bytes", data.len());
1112            (Some(topic), data)
1113        } else if frames.len() == 1 {
1114            (None, frames.remove(0).to_vec())
1115        } else {
1116            return Err(SdkError::Other(
1117                "Unexpected multipart motor payload".to_string(),
1118            ));
1119        };
1120
1121        // Do not filter by topic here.
1122        //
1123        // FEAGI publishers have historically used different topic conventions
1124        // (agent_id, empty topic, or other routing keys). Since we subscribe to all topics,
1125        // the safest approach is to accept the motor payload regardless of topic and let
1126        // higher layers decide what to do with it.
1127
1128        // ARCHITECTURE COMPLIANCE: Deserialize binary XYZP motor data using FeagiByteContainer
1129        let mut byte_container = feagi_serialization::FeagiByteContainer::new_empty();
1130        let mut data_vec = data.to_vec();
1131
1132        // Load bytes into container
1133        byte_container
1134            .try_write_data_to_container_and_verify(&mut |bytes| {
1135                std::mem::swap(bytes, &mut data_vec);
1136                Ok(())
1137            })
1138            .map_err(|e| SdkError::Other(format!("Failed to load motor data bytes: {:?}", e)))?;
1139
1140        // Get number of structures (should be 1 for motor data)
1141        let num_structures = byte_container
1142            .try_get_number_contained_structures()
1143            .map_err(|e| SdkError::Other(format!("Failed to get structure count: {:?}", e)))?;
1144
1145        if num_structures == 0 {
1146            return Ok(None);
1147        }
1148
1149        // Extract first structure
1150        let boxed_struct = byte_container
1151            .try_create_new_struct_from_index(0)
1152            .map_err(|e| SdkError::Other(format!("Failed to extract motor structure: {:?}", e)))?;
1153
1154        // Downcast to CorticalMappedXYZPNeuronVoxels
1155        let motor_data = boxed_struct
1156            .as_any()
1157            .downcast_ref::<CorticalMappedXYZPNeuronVoxels>()
1158            .ok_or_else(|| {
1159                SdkError::Other("Motor data is not CorticalMappedXYZPNeuronVoxels".to_string())
1160            })?
1161            .clone();
1162
1163        debug!(
1164            "[CLIENT] ✓ Received motor data ({} bytes, {} areas)",
1165            data.len(),
1166            motor_data.len()
1167        );
1168        Ok(Some(motor_data))
1169    }
1170
1171    /// Receive visualization data from FEAGI (non-blocking)
1172    ///
1173    /// Returns None if no data is available.
1174    ///
1175    /// # Example
1176    /// ```ignore
1177    /// if let Some(viz_data) = client.receive_visualization_data()? {
1178    ///     // Process neural activity data
1179    ///     println!("Visualization data size: {} bytes", viz_data.len());
1180    /// }
1181    /// ```
1182    pub fn receive_visualization_data(&self) -> Result<Option<Vec<u8>>> {
1183        if !self.registered {
1184            return Err(SdkError::NotRegistered);
1185        }
1186
1187        let socket = self.viz_socket.as_ref().ok_or_else(|| {
1188            SdkError::Other(
1189                "Visualization socket not initialized (not a visualization/infrastructure agent?)"
1190                    .to_string(),
1191            )
1192        })?;
1193
1194        let mut socket = socket
1195            .lock()
1196            .map_err(|e| SdkError::ThreadError(format!("Failed to lock socket: {}", e)))?;
1197        let recv_result = self.block_on(async { socket.recv().now_or_never() });
1198        match recv_result {
1199            None => Ok(None),
1200            Some(Ok(message)) => {
1201                let mut frames = message.into_vec();
1202                if frames.is_empty() {
1203                    return Ok(None);
1204                }
1205                let data = frames.pop().unwrap().to_vec();
1206                debug!(
1207                    "[CLIENT] ✓ Received visualization data ({} bytes)",
1208                    data.len()
1209                );
1210                Ok(Some(data))
1211            }
1212            Some(Err(e)) => Err(SdkError::Zmq(e)),
1213        }
1214    }
1215
1216    /// Make a REST API request to FEAGI over ZMQ
1217    ///
1218    /// # Arguments
1219    /// * `method` - HTTP method (GET, POST, PUT, DELETE)
1220    /// * `route` - API route (e.g., "/v1/system/health_check")
1221    /// * `data` - Optional request body for POST/PUT requests
1222    ///
1223    /// # Example
1224    /// ```ignore
1225    /// // GET request
1226    /// let health = client.control_request("GET", "/v1/system/health_check", None)?;
1227    ///
1228    /// // POST request
1229    /// let data = serde_json::json!({"key": "value"});
1230    /// let response = client.control_request("POST", "/v1/some/endpoint", Some(data))?;
1231    /// ```
1232    pub fn control_request(
1233        &self,
1234        method: &str,
1235        route: &str,
1236        data: Option<serde_json::Value>,
1237    ) -> Result<serde_json::Value> {
1238        if !self.registered {
1239            return Err(SdkError::NotRegistered);
1240        }
1241
1242        let socket = self.control_socket.as_ref().ok_or_else(|| {
1243            SdkError::Other(
1244                "Control socket not initialized (not an infrastructure agent?)".to_string(),
1245            )
1246        })?;
1247
1248        // Prepare REST-over-ZMQ request
1249        let mut request = serde_json::json!({
1250            "method": method,
1251            "route": route,
1252            "headers": {"content-type": "application/json"},
1253        });
1254
1255        if let Some(body) = data {
1256            request["body"] = body;
1257        }
1258
1259        let response = self.send_request(socket, &request)?;
1260        debug!("[CLIENT] ✓ Control request {} {} completed", method, route);
1261        Ok(response)
1262    }
1263
1264    /// Check if agent is registered
1265    pub fn is_registered(&self) -> bool {
1266        self.registered
1267    }
1268
1269    /// Get agent ID
1270    pub fn agent_id(&self) -> &str {
1271        &self.config.agent_id
1272    }
1273}
1274
1275impl Drop for AgentClient {
1276    fn drop(&mut self) {
1277        debug!("[CLIENT] Dropping AgentClient: {}", self.config.agent_id);
1278
1279        // Step 1: Stop heartbeat service first (this stops background threads)
1280        if let Some(mut heartbeat) = self.heartbeat.take() {
1281            debug!("[CLIENT] Stopping heartbeat service before cleanup");
1282            heartbeat.stop();
1283            debug!("[CLIENT] Heartbeat service stopped");
1284        }
1285
1286        // Step 2: Deregister from FEAGI (after threads stopped)
1287        if self.registered {
1288            debug!("[CLIENT] Deregistering agent: {}", self.config.agent_id);
1289            if let Err(e) = self.deregister() {
1290                warn!("[CLIENT] Deregistration failed during drop: {}", e);
1291                // Continue cleanup even if deregistration fails
1292            }
1293        }
1294
1295        // Step 3: Sockets will be dropped automatically
1296        debug!(
1297            "[CLIENT] AgentClient dropped cleanly: {}",
1298            self.config.agent_id
1299        );
1300    }
1301}
1302
1303#[cfg(test)]
1304mod tests {
1305    use super::*;
1306    use feagi_io::AgentType;
1307
1308    #[test]
1309    fn test_client_creation() {
1310        let config = AgentConfig::new("test_agent", AgentType::Sensory)
1311            .with_vision_capability("camera", (640, 480), 3, "i_vision")
1312            .with_registration_endpoint("tcp://localhost:8000")
1313            .with_sensory_endpoint("tcp://localhost:5558");
1314
1315        let client = AgentClient::new(config);
1316        assert!(client.is_ok());
1317
1318        let client = client.unwrap();
1319        assert!(!client.is_registered());
1320        assert_eq!(client.agent_id(), "test_agent");
1321    }
1322
1323    // Note: Full integration tests require a running FEAGI instance
1324    // and should be in separate integration test files
1325}