Skip to main content

feagi_agent/core/
client.rs

1// Copyright 2025 Neuraville Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! FEAGI Agent Client implementation
5
6use crate::core::config::AgentConfig;
7use crate::core::error::{Result, SdkError};
8use crate::core::heartbeat::HeartbeatService;
9use crate::core::reconnect::{retry_with_backoff, ReconnectionStrategy};
10use feagi_io::AgentType;
11use futures::FutureExt;
12use std::future::Future;
13use std::net::{TcpStream, ToSocketAddrs};
14use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
15use std::sync::{Arc, Mutex};
16use std::time::Duration;
17use std::time::SystemTime;
18use tokio::runtime::Handle;
19use tokio::runtime::Runtime;
20use tokio::task::block_in_place;
21use tokio::time::timeout;
22use tracing::{debug, error, info, trace, warn};
23use zeromq::{
24    DealerSocket, PushSocket, Socket, SocketRecv, SocketSend, SubSocket, ZmqError, ZmqMessage,
25};
26
27fn block_on_with<T>(
28    handle: &Handle,
29    runtime: Option<&Runtime>,
30    future: impl Future<Output = T>,
31) -> T {
32    if Handle::try_current().is_ok() {
33        block_in_place(|| handle.block_on(future))
34    } else if let Some(runtime) = runtime {
35        runtime.block_on(future)
36    } else {
37        handle.block_on(future)
38    }
39}
40
41/// Main FEAGI Agent Client
42///
43/// This client handles:
44/// - Registration with FEAGI
45/// - Automatic heartbeat
46/// - Sending sensory data
47/// - Receiving motor data (for motor agents)
48/// - Automatic deregistration on drop
49///
50/// # Example
51/// ```ignore
52/// use feagi_agent::{AgentClient, AgentConfig, AgentType};
53///
54/// let config = AgentConfig::new("my_camera", AgentType::Sensory)
55///     .with_feagi_host("localhost")
56///     .with_vision_capability("camera", (640, 480), 3, "i_vision");
57///
58/// let mut client = AgentClient::new(config)?;
59/// client.connect()?;
60///
61/// // Send sensory data
62/// client.send_sensory_data(vec![(0, 50.0), (1, 75.0)])?;
63///
64/// // Client auto-deregisters on drop
65/// ```
66pub struct AgentClient {
67    /// Configuration
68    config: AgentConfig,
69
70    /// Tokio runtime handle (ambient if available)
71    runtime_handle: Handle,
72
73    /// Owned runtime when created outside tokio
74    runtime: Option<Arc<Runtime>>,
75
76    /// Registration socket (ZMQ DEALER - shared with heartbeat)
77    registration_socket: Option<Arc<Mutex<DealerSocket>>>,
78
79    /// Sensory data socket (ZMQ PUSH)
80    sensory_socket: Option<Arc<Mutex<PushSocket>>>,
81
82    /// Motor data socket (ZMQ SUB)
83    motor_socket: Option<Arc<Mutex<SubSocket>>>,
84
85    /// Visualization stream socket (ZMQ SUB)
86    viz_socket: Option<Arc<Mutex<SubSocket>>>,
87
88    /// Control/API socket (ZMQ DEALER - REST over ZMQ)
89    control_socket: Option<Arc<Mutex<DealerSocket>>>,
90
91    /// Heartbeat service
92    heartbeat: Option<HeartbeatService>,
93
94    /// Registration state
95    registered: bool,
96
97    /// Last reconnect attempt timestamp (ms since UNIX epoch)
98    last_reconnect_attempt_ms: AtomicU64,
99
100    /// Sensory socket has completed at least one successful send
101    sensory_connected: AtomicBool,
102
103    /// Last successful registration response body (JSON) returned by FEAGI.
104    ///
105    /// FEAGI registration is performed via "REST over ZMQ" and returns a wrapper:
106    /// `{ "status": 200, "body": { ... } }`. This field stores the `body` object.
107    ///
108    /// @cursor:ffi-safe - this is used by language bindings (Java JNI) to avoid
109    /// re-implementing FEAGI-specific response parsing in non-Rust SDKs.
110    last_registration_body: Option<serde_json::Value>,
111}
112
113impl AgentClient {
114    /// Create a new FEAGI agent client
115    ///
116    /// # Arguments
117    /// * `config` - Agent configuration
118    ///
119    pub fn new(config: AgentConfig) -> Result<Self> {
120        // Validate configuration
121        config.validate()?;
122
123        let (runtime_handle, runtime) = if let Ok(handle) = Handle::try_current() {
124            (handle, None)
125        } else {
126            let runtime = Runtime::new()
127                .map_err(|e| SdkError::Other(format!("Failed to create runtime: {}", e)))?;
128            let handle = runtime.handle().clone();
129            (handle, Some(Arc::new(runtime)))
130        };
131
132        Ok(Self {
133            config,
134            runtime_handle,
135            runtime,
136            registration_socket: None,
137            sensory_socket: None,
138            motor_socket: None,
139            viz_socket: None,
140            control_socket: None,
141            heartbeat: None,
142            registered: false,
143            last_registration_body: None,
144            last_reconnect_attempt_ms: AtomicU64::new(0),
145            sensory_connected: AtomicBool::new(false),
146        })
147    }
148
149    /// Get the last successful registration response body (JSON), if available.
150    ///
151    /// This is only set after a successful `connect()` / registration step.
152    pub fn registration_body_json(&self) -> Option<&serde_json::Value> {
153        self.last_registration_body.as_ref()
154    }
155
156    /// Connect to FEAGI and register the agent
157    ///
158    /// This will:
159    /// 1. Create ZMQ control sockets (registration/control)
160    /// 2. Register with FEAGI
161    /// 3. Create ZMQ data sockets (sensory/motor/viz)
162    /// 4. Start heartbeat service (ONLY after successful registration)
163    ///
164    /// IMPORTANT: Background threads are ONLY started AFTER successful registration.
165    /// This prevents thread interference with GUI event loops (e.g., MuJoCo, Godot).
166    /// If registration fails, NO threads are spawned.
167    ///
168    /// TIMING: FEAGI starts its ZMQ data stream services asynchronously after processing
169    /// the registration message. Data socket connection uses retry/backoff to handle
170    /// stream readiness delays.
171    pub fn connect(&mut self) -> Result<()> {
172        if self.registered {
173            return Err(SdkError::AlreadyConnected);
174        }
175
176        info!(
177            "[CLIENT] Step 0: starting connection sequence to FEAGI: {}",
178            self.config.registration_endpoint
179        );
180
181        // Step 1: Create control sockets with retry (registration/control only)
182        info!("[CLIENT] Step 1: creating control sockets (registration)...");
183        let mut socket_strategy = ReconnectionStrategy::new(
184            self.config.retry_backoff_ms,
185            self.config.registration_retries,
186        );
187        retry_with_backoff(
188            || self.create_sockets(),
189            &mut socket_strategy,
190            "Socket creation",
191        )?;
192        info!("[CLIENT] Step 1: control sockets created");
193
194        // Step 2: Register with FEAGI with retry
195        info!("[CLIENT] Step 2: registering with FEAGI...");
196        let mut reg_strategy = ReconnectionStrategy::new(
197            self.config.retry_backoff_ms,
198            self.config.registration_retries,
199        );
200        retry_with_backoff(|| self.register(), &mut reg_strategy, "Registration")?;
201        info!("[CLIENT] Step 2: registration successful");
202
203        // Step 3: Create data sockets with retry (sensory/motor/viz)
204        //
205        // Retry strategy: FEAGI may start streams asynchronously, so we retry with backoff.
206        info!("[CLIENT] Step 3: connecting to data streams with retry...");
207        info!(
208            "[CLIENT]   sensory endpoint: {}",
209            self.config.sensory_endpoint
210        );
211        if matches!(self.config.agent_type, AgentType::Motor | AgentType::Both) {
212            info!("[CLIENT]   motor endpoint: {}", self.config.motor_endpoint);
213        }
214        let mut data_socket_strategy = ReconnectionStrategy::new(
215            self.config.retry_backoff_ms,
216            self.config.registration_retries,
217        );
218        retry_with_backoff(
219            || self.connect_data_sockets(),
220            &mut data_socket_strategy,
221            "Data socket creation",
222        )?;
223        info!("[CLIENT] Step 3: data sockets connected");
224
225        // Step 4: Start heartbeat service (ONLY after successful registration)
226        info!("[CLIENT] Step 4: starting heartbeat service...");
227        if self.config.heartbeat_interval > 0.0 {
228            self.start_heartbeat()?;
229            info!("[CLIENT] Step 4: heartbeat service started");
230        } else {
231            info!("[CLIENT] Step 4: heartbeat disabled (interval = 0)");
232        }
233
234        info!(
235            "[CLIENT] fully connected to FEAGI as agent: {}",
236            self.config.agent_id
237        );
238        Ok(())
239    }
240
241    fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
242        block_on_with(&self.runtime_handle, self.runtime.as_deref(), future)
243    }
244
245    /// Create ZMQ sockets
246    fn create_sockets(&mut self) -> Result<()> {
247        // Registration socket (DEALER - for registration and heartbeat)
248        let mut reg_socket = DealerSocket::new();
249        self.block_on(reg_socket.connect(&self.config.registration_endpoint))
250            .map_err(SdkError::Zmq)?;
251        self.registration_socket = Some(Arc::new(Mutex::new(reg_socket)));
252
253        // Control socket (REQ - for REST API requests over ZMQ)
254        if matches!(self.config.agent_type, AgentType::Infrastructure) {
255            let mut control_socket = DealerSocket::new();
256            self.block_on(control_socket.connect(&self.config.control_endpoint))
257                .map_err(SdkError::Zmq)?;
258            self.control_socket = Some(Arc::new(Mutex::new(control_socket)));
259            debug!("[CLIENT] ✓ Control/API socket created");
260        }
261
262        debug!("[CLIENT] ✓ ZMQ control sockets created");
263        Ok(())
264    }
265
266    /// Connect data sockets (sensory/motor/viz) after registration
267    fn connect_data_sockets(&mut self) -> Result<()> {
268        // Sensory socket (PUSH - for sending data to FEAGI)
269        self.sensory_connected.store(false, Ordering::Relaxed);
270        self.wait_for_tcp_endpoint("sensory", &self.config.sensory_endpoint)?;
271        let mut sensory_socket = PushSocket::new();
272        self.block_on(sensory_socket.connect(&self.config.sensory_endpoint))
273            .map_err(SdkError::Zmq)?;
274        self.sensory_socket = Some(Arc::new(Mutex::new(sensory_socket)));
275
276        // Motor socket (SUB - for receiving motor commands from FEAGI)
277        if matches!(self.config.agent_type, AgentType::Motor | AgentType::Both) {
278            info!(
279                "[SDK-CONNECT] 🎮 Initializing motor socket for agent '{}' (type: {:?})",
280                self.config.agent_id, self.config.agent_type
281            );
282            info!(
283                "[SDK-CONNECT] 🎮 Motor endpoint: {}",
284                self.config.motor_endpoint
285            );
286
287            self.wait_for_tcp_endpoint("motor", &self.config.motor_endpoint)?;
288            let mut motor_socket = SubSocket::new();
289            self.block_on(motor_socket.connect(&self.config.motor_endpoint))
290                .map_err(SdkError::Zmq)?;
291            info!("[SDK-CONNECT] ✅ Motor socket connected");
292
293            // Subscribe to all motor messages.
294            //
295            // FEAGI motor PUB may publish either:
296            // - multipart [agent_id, data] (preferred), or
297            // - single-frame [data] (legacy).
298            //
299            // Subscribing only to agent_id would miss the legacy single-frame format entirely,
300            // and also breaks if the publisher uses an empty topic. We subscribe to all, then
301            // filter by topic in receive_motor_data().
302            info!("[SDK-CONNECT] 🎮 Subscribing to all motor topics");
303            self.block_on(motor_socket.subscribe(""))
304                .map_err(SdkError::Zmq)?;
305            info!("[SDK-CONNECT] ✅ Motor subscription set (all topics)");
306
307            self.motor_socket = Some(Arc::new(Mutex::new(motor_socket)));
308            info!("[SDK-CONNECT] ✅ Motor socket initialized successfully");
309        } else {
310            info!(
311                "[SDK-CONNECT] ⚠️ Motor socket NOT initialized (agent type: {:?})",
312                self.config.agent_type
313            );
314        }
315
316        // Visualization socket (SUB - for receiving neural activity stream from FEAGI)
317        if matches!(
318            self.config.agent_type,
319            AgentType::Visualization | AgentType::Infrastructure
320        ) {
321            self.wait_for_tcp_endpoint("visualization", &self.config.visualization_endpoint)?;
322            let mut viz_socket = SubSocket::new();
323            self.block_on(viz_socket.connect(&self.config.visualization_endpoint))
324                .map_err(SdkError::Zmq)?;
325
326            // Subscribe to all visualization messages
327            self.block_on(viz_socket.subscribe(""))
328                .map_err(SdkError::Zmq)?;
329            self.viz_socket = Some(Arc::new(Mutex::new(viz_socket)));
330            debug!("[CLIENT] ✓ Visualization socket created");
331        }
332
333        debug!("[CLIENT] ✓ ZMQ data sockets created");
334        Ok(())
335    }
336
337    /// Reconnect data sockets (sensory/motor/viz) using configured retry/backoff.
338    pub fn reconnect_data_streams(&mut self) -> Result<()> {
339        if !self.registered {
340            return Err(SdkError::NotRegistered);
341        }
342
343        let now_ms = SystemTime::now()
344            .duration_since(SystemTime::UNIX_EPOCH)
345            .unwrap_or_default()
346            .as_millis() as u64;
347        let last_ms = self.last_reconnect_attempt_ms.load(Ordering::Relaxed);
348        let min_interval_ms = self.config.retry_backoff_ms.max(1);
349        if now_ms.saturating_sub(last_ms) < min_interval_ms {
350            return Ok(());
351        }
352        self.last_reconnect_attempt_ms
353            .store(now_ms, Ordering::Relaxed);
354
355        self.close_data_sockets();
356
357        let mut data_socket_strategy = ReconnectionStrategy::new(
358            self.config.retry_backoff_ms,
359            self.config.registration_retries,
360        );
361        retry_with_backoff(
362            || self.connect_data_sockets(),
363            &mut data_socket_strategy,
364            "Data socket reconnection",
365        )
366    }
367
368    fn close_data_sockets(&mut self) {
369        self.sensory_connected.store(false, Ordering::Relaxed);
370        if let Some(socket) = self.sensory_socket.take() {
371            if let Ok(socket) = Arc::try_unwrap(socket) {
372                match socket.into_inner() {
373                    Ok(socket) => {
374                        let _ = self.block_on(socket.close());
375                    }
376                    Err(poisoned) => {
377                        let _ = self.block_on(poisoned.into_inner().close());
378                    }
379                }
380            }
381        }
382        if let Some(socket) = self.motor_socket.take() {
383            if let Ok(socket) = Arc::try_unwrap(socket) {
384                match socket.into_inner() {
385                    Ok(socket) => {
386                        let _ = self.block_on(socket.close());
387                    }
388                    Err(poisoned) => {
389                        let _ = self.block_on(poisoned.into_inner().close());
390                    }
391                }
392            }
393        }
394        if let Some(socket) = self.viz_socket.take() {
395            if let Ok(socket) = Arc::try_unwrap(socket) {
396                match socket.into_inner() {
397                    Ok(socket) => {
398                        let _ = self.block_on(socket.close());
399                    }
400                    Err(poisoned) => {
401                        let _ = self.block_on(poisoned.into_inner().close());
402                    }
403                }
404            }
405        }
406    }
407
408    fn wait_for_tcp_endpoint(&self, label: &str, endpoint: &str) -> Result<()> {
409        let address = endpoint
410            .strip_prefix("tcp://")
411            .ok_or_else(|| {
412                SdkError::InvalidConfig(format!("Invalid {} endpoint: {}", label, endpoint))
413            })?
414            .to_string();
415        let timeout = Duration::from_millis(self.config.connection_timeout_ms);
416
417        info!("[CLIENT] checking {} endpoint: {}", label, endpoint);
418        let mut strategy = ReconnectionStrategy::new(
419            self.config.retry_backoff_ms,
420            self.config.registration_retries,
421        );
422        retry_with_backoff(
423            || {
424                let mut addrs = address.to_socket_addrs().map_err(|e| {
425                    SdkError::InvalidConfig(format!(
426                        "Failed to resolve {} endpoint {}: {}",
427                        label, endpoint, e
428                    ))
429                })?;
430                let addr = addrs.next().ok_or_else(|| {
431                    SdkError::InvalidConfig(format!(
432                        "No resolved addresses for {} endpoint {}",
433                        label, endpoint
434                    ))
435                })?;
436                TcpStream::connect_timeout(&addr, timeout).map_err(|e| {
437                    SdkError::Timeout(format!(
438                        "{} endpoint {} not reachable: {}",
439                        label, endpoint, e
440                    ))
441                })?;
442                info!("[CLIENT] ✅ {} endpoint is reachable", label);
443                Ok(())
444            },
445            &mut strategy,
446            &format!("{} endpoint availability", label),
447        )
448    }
449
450    fn send_request(
451        &self,
452        socket: &Arc<Mutex<DealerSocket>>,
453        request: &serde_json::Value,
454    ) -> Result<serde_json::Value> {
455        let mut socket = socket
456            .lock()
457            .map_err(|e| SdkError::ThreadError(format!("Failed to lock socket: {}", e)))?;
458
459        let message = ZmqMessage::from(request.to_string().into_bytes());
460        self.block_on(socket.send(message)).map_err(SdkError::Zmq)?;
461
462        let timeout_ms = self.config.connection_timeout_ms;
463        let recv_result = if timeout_ms > 0 {
464            self.block_on(async {
465                timeout(std::time::Duration::from_millis(timeout_ms), socket.recv()).await
466            })
467            .map_err(|_| SdkError::Timeout(format!("Request timed out after {}ms", timeout_ms)))?
468            .map_err(SdkError::Zmq)?
469        } else {
470            self.block_on(socket.recv()).map_err(SdkError::Zmq)?
471        };
472
473        let frames = recv_result.into_vec();
474        let payload = frames
475            .last()
476            .ok_or_else(|| SdkError::Other("Response was empty".to_string()))?;
477        let response: serde_json::Value = serde_json::from_slice(payload)?;
478        Ok(response)
479    }
480
481    /// Register with FEAGI
482    fn register(&mut self) -> Result<()> {
483        let registration_msg = serde_json::json!({
484            "method": "POST",
485            "path": "/v1/agent/register",
486            "body": {
487                "agent_id": self.config.agent_id,
488                "agent_type": match self.config.agent_type {
489                    AgentType::Sensory => "sensory",
490                    AgentType::Motor => "motor",
491                    AgentType::Both => "both",
492                    AgentType::Visualization => "visualization",
493                    AgentType::Infrastructure => "infrastructure",
494                },
495                "capabilities": self.config.capabilities,
496            }
497        });
498
499        let socket = self
500            .registration_socket
501            .as_ref()
502            .ok_or_else(|| SdkError::Other("Registration socket not initialized".to_string()))?;
503
504        debug!(
505            "[CLIENT] Sending registration request for: {}",
506            self.config.agent_id
507        );
508        let response = self.send_request(socket, &registration_msg)?;
509
510        // Check response status (REST format: {"status": 200, "body": {...}})
511        let status_code = response
512            .get("status")
513            .and_then(|s| s.as_u64())
514            .unwrap_or(500);
515        if status_code == 200 {
516            self.registered = true;
517            // Capture the `body` for downstream consumers (FFI bindings).
518            let empty_body = serde_json::json!({});
519            let body = response.get("body").unwrap_or(&empty_body);
520            self.last_registration_body = Some(body.clone());
521            info!("[CLIENT] ✓ Registration successful: {:?}", response);
522            Ok(())
523        } else {
524            let empty_body = serde_json::json!({});
525            let body = response.get("body").unwrap_or(&empty_body);
526            let message = body
527                .get("error")
528                .and_then(|m| m.as_str())
529                .unwrap_or("Unknown error");
530            // Clear any previously cached registration body on failure.
531            self.last_registration_body = None;
532
533            // Check if already registered - try deregistration and retry
534            if message.contains("already registered") {
535                warn!("[CLIENT] ⚠ Agent already registered - attempting deregistration and retry");
536                self.deregister()?;
537
538                // Retry registration after deregistration
539                info!("[CLIENT] Retrying registration after deregistration...");
540                std::thread::sleep(std::time::Duration::from_millis(100)); // Brief delay
541
542                // Recursive retry (only once - avoid infinite loop)
543                self.register_with_retry_once()
544            } else {
545                error!("[CLIENT] ✗ Registration failed: {}", message);
546                Err(SdkError::RegistrationFailed(message.to_string()))
547            }
548        }
549    }
550
551    /// Register with FEAGI (with automatic retry after deregistration)
552    fn register_with_retry_once(&mut self) -> Result<()> {
553        let registration_msg = serde_json::json!({
554            "method": "POST",
555            "path": "/v1/agent/register",
556            "body": {
557                "agent_id": self.config.agent_id,
558                "agent_type": match self.config.agent_type {
559                    AgentType::Sensory => "sensory",
560                    AgentType::Motor => "motor",
561                    AgentType::Both => "both",
562                    AgentType::Visualization => "visualization",
563                    AgentType::Infrastructure => "infrastructure",
564                },
565                "capabilities": self.config.capabilities,
566            }
567        });
568
569        let socket = self
570            .registration_socket
571            .as_ref()
572            .ok_or_else(|| SdkError::Other("Registration socket not initialized".to_string()))?;
573
574        debug!(
575            "[CLIENT] Sending registration request (retry) for: {}",
576            self.config.agent_id
577        );
578        let response = self.send_request(socket, &registration_msg)?;
579
580        // Check response status
581        let status_code = response
582            .get("status")
583            .and_then(|s| s.as_u64())
584            .unwrap_or(500);
585        if status_code == 200 {
586            self.registered = true;
587            // Capture the `body` for downstream consumers (FFI bindings).
588            let empty_body = serde_json::json!({});
589            let body = response.get("body").unwrap_or(&empty_body);
590            self.last_registration_body = Some(body.clone());
591            info!(
592                "[CLIENT] ✓ Registration successful (after retry): {:?}",
593                response
594            );
595            Ok(())
596        } else {
597            let empty_body = serde_json::json!({});
598            let body = response.get("body").unwrap_or(&empty_body);
599            let message = body
600                .get("error")
601                .and_then(|m| m.as_str())
602                .unwrap_or("Unknown error");
603            self.last_registration_body = None;
604            error!("[CLIENT] ✗ Registration retry failed: {}", message);
605            Err(SdkError::RegistrationFailed(message.to_string()))
606        }
607    }
608
609    /// Deregister from FEAGI
610    fn deregister(&mut self) -> Result<()> {
611        if !self.registered && self.registration_socket.is_none() {
612            return Ok(()); // Nothing to deregister
613        }
614
615        info!("[CLIENT] Deregistering agent: {}", self.config.agent_id);
616
617        let deregistration_msg = serde_json::json!({
618            "method": "DELETE",
619            "path": "/v1/agent/deregister",
620            "body": {
621                "agent_id": self.config.agent_id,
622            }
623        });
624
625        if let Some(socket) = &self.registration_socket {
626            match self.send_request(socket, &deregistration_msg) {
627                Ok(response) => {
628                    if response.get("status").and_then(|s| s.as_u64()) == Some(200) {
629                        info!("[CLIENT] ✓ Deregistration successful");
630                    } else {
631                        warn!("[CLIENT] ⚠ Deregistration returned: {:?}", response);
632                    }
633                }
634                Err(e) => {
635                    warn!("[CLIENT] ⚠ Deregistration timeout/error: {}", e);
636                }
637            };
638        }
639
640        self.registered = false;
641        Ok(())
642    }
643
644    /// Start heartbeat service
645    fn start_heartbeat(&mut self) -> Result<()> {
646        if self.heartbeat.is_some() {
647            return Ok(());
648        }
649
650        let socket = self
651            .registration_socket
652            .as_ref()
653            .ok_or_else(|| SdkError::Other("Registration socket not initialized".to_string()))?;
654
655        let agent_type = match self.config.agent_type {
656            AgentType::Sensory => "sensory",
657            AgentType::Motor => "motor",
658            AgentType::Both => "both",
659            AgentType::Visualization => "visualization",
660            AgentType::Infrastructure => "infrastructure",
661        }
662        .to_string();
663        let capabilities = serde_json::to_value(&self.config.capabilities)
664            .map_err(|e| SdkError::Other(format!("Failed to serialize capabilities: {e}")))?;
665
666        let reconnect_spec = crate::core::heartbeat::ReconnectSpec {
667            agent_id: self.config.agent_id.clone(),
668            agent_type,
669            capabilities,
670            registration_retries: self.config.registration_retries,
671            retry_backoff_ms: self.config.retry_backoff_ms,
672        };
673
674        let mut heartbeat = HeartbeatService::new(
675            self.config.agent_id.clone(),
676            Arc::clone(socket),
677            self.runtime_handle.clone(),
678            self.runtime.clone(),
679            self.config.heartbeat_interval,
680        )
681        .with_reconnect_spec(reconnect_spec);
682
683        heartbeat.start()?;
684        self.heartbeat = Some(heartbeat);
685
686        debug!(
687            "[CLIENT] ✓ Heartbeat service started (interval: {}s)",
688            self.config.heartbeat_interval
689        );
690        Ok(())
691    }
692
693    /// Send sensory data to FEAGI
694    ///
695    /// # Arguments
696    /// * `neuron_pairs` - Vector of (neuron_id, potential) pairs
697    ///
698    /// # Example
699    /// ```ignore
700    /// client.send_sensory_data(vec![
701    ///     (0, 50.0),
702    ///     (1, 75.0),
703    ///     (2, 30.0),
704    /// ])?;
705    /// ```
706    pub fn send_sensory_data(&self, neuron_pairs: Vec<(i32, f64)>) -> Result<()> {
707        if !self.registered {
708            return Err(SdkError::NotRegistered);
709        }
710
711        let socket = self
712            .sensory_socket
713            .as_ref()
714            .ok_or_else(|| SdkError::Other("Sensory socket not initialized".to_string()))?;
715
716        // ARCHITECTURE COMPLIANCE: Use binary XYZP format, NOT JSON
717        // This serializes data using feagi_data_structures for cross-platform compatibility
718        use feagi_structures::genomic::cortical_area::CorticalID;
719        use feagi_structures::neuron_voxels::xyzp::{
720            CorticalMappedXYZPNeuronVoxels, NeuronVoxelXYZPArrays,
721        };
722
723        // Get cortical area and dimensions from vision capability
724        let vision_cap = self
725            .config
726            .capabilities
727            .vision
728            .as_ref()
729            .ok_or_else(|| SdkError::Other("No vision capability configured".to_string()))?;
730
731        let (width, _height) = vision_cap.dimensions;
732
733        // Derive cortical ID in a language-agnostic way if semantic unit+group is provided.
734        let cortical_id = if let (Some(unit), Some(group_index)) =
735            (vision_cap.unit, vision_cap.group)
736        {
737            use feagi_structures::genomic::cortical_area::io_cortical_area_configuration_flag::PercentageNeuronPositioning;
738            use feagi_structures::genomic::SensoryCorticalUnit;
739
740            let group: feagi_structures::genomic::cortical_area::descriptors::CorticalUnitIndex =
741                group_index.into();
742            let frame_change_handling = feagi_structures::genomic::cortical_area::io_cortical_area_configuration_flag::FrameChangeHandling::Absolute;
743            let percentage_neuron_positioning = PercentageNeuronPositioning::Linear;
744
745            let sensory_unit = match unit {
746                feagi_io::SensoryUnit::Infrared => SensoryCorticalUnit::Infrared,
747                feagi_io::SensoryUnit::Proximity => SensoryCorticalUnit::Proximity,
748                feagi_io::SensoryUnit::Shock => SensoryCorticalUnit::Shock,
749                feagi_io::SensoryUnit::Battery => SensoryCorticalUnit::Battery,
750                feagi_io::SensoryUnit::Servo => SensoryCorticalUnit::Servo,
751                feagi_io::SensoryUnit::AnalogGpio => SensoryCorticalUnit::AnalogGPIO,
752                feagi_io::SensoryUnit::DigitalGpio => SensoryCorticalUnit::DigitalGPIO,
753                feagi_io::SensoryUnit::MiscData => SensoryCorticalUnit::MiscData,
754                feagi_io::SensoryUnit::TextEnglishInput => SensoryCorticalUnit::TextEnglishInput,
755                feagi_io::SensoryUnit::CountInput => SensoryCorticalUnit::CountInput,
756                feagi_io::SensoryUnit::Vision => SensoryCorticalUnit::Vision,
757                feagi_io::SensoryUnit::SegmentedVision => SensoryCorticalUnit::SegmentedVision,
758                feagi_io::SensoryUnit::Accelerometer => SensoryCorticalUnit::Accelerometer,
759                feagi_io::SensoryUnit::Gyroscope => SensoryCorticalUnit::Gyroscope,
760            };
761
762            // Use the first sub-unit as the default send target for simple APIs.
763            // More advanced encoders should use the sensor cache mapping APIs instead.
764            match sensory_unit {
765                SensoryCorticalUnit::Infrared => {
766                    SensoryCorticalUnit::get_cortical_ids_array_for_infrared_with_parameters(
767                        frame_change_handling,
768                        percentage_neuron_positioning,
769                        group,
770                    )[0]
771                }
772                SensoryCorticalUnit::Proximity => {
773                    SensoryCorticalUnit::get_cortical_ids_array_for_proximity_with_parameters(
774                        frame_change_handling,
775                        percentage_neuron_positioning,
776                        group,
777                    )[0]
778                }
779                SensoryCorticalUnit::Shock => {
780                    SensoryCorticalUnit::get_cortical_ids_array_for_shock_with_parameters(
781                        frame_change_handling,
782                        percentage_neuron_positioning,
783                        group,
784                    )[0]
785                }
786                SensoryCorticalUnit::Battery => {
787                    SensoryCorticalUnit::get_cortical_ids_array_for_battery_with_parameters(
788                        frame_change_handling,
789                        percentage_neuron_positioning,
790                        group,
791                    )[0]
792                }
793                SensoryCorticalUnit::Servo => {
794                    SensoryCorticalUnit::get_cortical_ids_array_for_servo_with_parameters(
795                        frame_change_handling,
796                        percentage_neuron_positioning,
797                        group,
798                    )[0]
799                }
800                SensoryCorticalUnit::AnalogGPIO => {
801                    SensoryCorticalUnit::get_cortical_ids_array_for_analog_g_p_i_o_with_parameters(
802                        frame_change_handling,
803                        percentage_neuron_positioning,
804                        group,
805                    )[0]
806                }
807                SensoryCorticalUnit::DigitalGPIO => {
808                    SensoryCorticalUnit::get_cortical_ids_array_for_digital_g_p_i_o_with_parameters(group)[0]
809                }
810                SensoryCorticalUnit::MiscData => {
811                    SensoryCorticalUnit::get_cortical_ids_array_for_misc_data_with_parameters(
812                        frame_change_handling,
813                        group,
814                    )[0]
815                }
816                SensoryCorticalUnit::TextEnglishInput => {
817                    SensoryCorticalUnit::get_cortical_ids_array_for_text_english_input_with_parameters(
818                        frame_change_handling,
819                        group,
820                    )[0]
821                }
822                SensoryCorticalUnit::CountInput => {
823                    SensoryCorticalUnit::get_cortical_ids_array_for_count_input_with_parameters(
824                        frame_change_handling,
825                        percentage_neuron_positioning,
826                        group,
827                    )[0]
828                }
829                SensoryCorticalUnit::Vision => {
830                    SensoryCorticalUnit::get_cortical_ids_array_for_vision_with_parameters(
831                        frame_change_handling,
832                        group,
833                    )[0]
834                }
835                SensoryCorticalUnit::SegmentedVision => {
836                    SensoryCorticalUnit::get_cortical_ids_array_for_segmented_vision_with_parameters(
837                        frame_change_handling,
838                        group,
839                    )[0]
840                }
841                SensoryCorticalUnit::Accelerometer => {
842                    SensoryCorticalUnit::get_cortical_ids_array_for_accelerometer_with_parameters(
843                        frame_change_handling,
844                        percentage_neuron_positioning,
845                        group,
846                    )[0]
847                }
848                SensoryCorticalUnit::Gyroscope => {
849                    SensoryCorticalUnit::get_cortical_ids_array_for_gyroscope_with_parameters(
850                        frame_change_handling,
851                        percentage_neuron_positioning,
852                        group,
853                    )[0]
854                }
855            }
856        } else {
857            let cortical_area = &vision_cap.target_cortical_area;
858
859            // Legacy: Create CorticalID from area name
860            let mut bytes = [b' '; 8];
861            let name_bytes = cortical_area.as_bytes();
862            let copy_len = name_bytes.len().min(8);
863            bytes[..copy_len].copy_from_slice(&name_bytes[..copy_len]);
864            CorticalID::try_from_bytes(&bytes).map_err(|e| {
865                SdkError::Other(format!("Invalid cortical ID '{}': {:?}", cortical_area, e))
866            })?
867        };
868
869        // Convert flat neuron IDs to XYZP format
870        let mut x_coords = Vec::with_capacity(neuron_pairs.len());
871        let mut y_coords = Vec::with_capacity(neuron_pairs.len());
872        let mut z_coords = Vec::with_capacity(neuron_pairs.len());
873        let mut potentials = Vec::with_capacity(neuron_pairs.len());
874
875        for (neuron_id, potential) in neuron_pairs {
876            let neuron_id = neuron_id as u32;
877            x_coords.push(neuron_id % (width as u32));
878            y_coords.push(neuron_id / (width as u32));
879            z_coords.push(0); // Single channel grayscale
880            potentials.push(potential as f32);
881        }
882
883        let _neuron_count = x_coords.len(); // Reserved for future validation
884
885        // Create neuron arrays from vectors
886        let neuron_arrays =
887            NeuronVoxelXYZPArrays::new_from_vectors(x_coords, y_coords, z_coords, potentials)
888                .map_err(|e| SdkError::Other(format!("Failed to create neuron arrays: {:?}", e)))?;
889
890        // Create cortical mapped data
891        let cortical_id_log = cortical_id.as_base_64();
892        let mut cortical_mapped = CorticalMappedXYZPNeuronVoxels::new();
893        cortical_mapped.insert(cortical_id, neuron_arrays);
894
895        // Serialize to binary using FeagiByteContainer (version 2 container format)
896        let mut byte_container = feagi_serialization::FeagiByteContainer::new_empty();
897        byte_container
898            .overwrite_byte_data_with_single_struct_data(&cortical_mapped, 0)
899            .map_err(|e| SdkError::Other(format!("Failed to serialize to container: {:?}", e)))?;
900
901        let buffer = byte_container.get_byte_ref().to_vec();
902
903        // Send binary XYZP data (version 2 container format)
904        let mut socket = socket
905            .lock()
906            .map_err(|e| SdkError::ThreadError(format!("Failed to lock socket: {}", e)))?;
907        let message = ZmqMessage::from(buffer.clone());
908        self.block_on(socket.send(message)).map_err(SdkError::Zmq)?;
909
910        debug!(
911            "[CLIENT] Sent {} bytes XYZP binary to {}",
912            buffer.len(),
913            cortical_id_log
914        );
915        Ok(())
916    }
917
918    /// Send pre-serialized sensory bytes to FEAGI (real-time semantics).
919    ///
920    /// This is intended for high-performance clients (e.g., Python SDK brain_input)
921    /// that already produce FeagiByteContainer bytes via Rust-side encoding caches.
922    ///
923    /// Real-time policy:
924    /// - Uses ZMQ DONTWAIT to avoid blocking the caller.
925    /// - On backpressure (EAGAIN), the message is dropped (latest-only semantics).
926    pub fn send_sensory_bytes(&self, bytes: Vec<u8>) -> Result<()> {
927        let _ = self.try_send_sensory_bytes(&bytes)?;
928        Ok(())
929    }
930
931    /// Try sending pre-serialized sensory bytes to FEAGI (non-blocking), returning whether it was sent.
932    ///
933    /// Returns:
934    /// - `Ok(true)` if the message was sent.
935    /// - `Ok(false)` if dropped due to backpressure (EAGAIN).
936    /// - `Err(...)` for other failures (not registered, socket errors).
937    pub fn try_send_sensory_bytes(&self, bytes: &[u8]) -> Result<bool> {
938        if !self.registered {
939            return Err(SdkError::NotRegistered);
940        }
941
942        let socket = self
943            .sensory_socket
944            .as_ref()
945            .ok_or_else(|| SdkError::Other("Sensory socket not initialized".to_string()))?;
946
947        let mut socket = socket
948            .lock()
949            .map_err(|e| SdkError::ThreadError(format!("Failed to lock socket: {}", e)))?;
950        let message = ZmqMessage::from(bytes.to_vec());
951
952        if !self.sensory_connected.load(Ordering::Relaxed) {
953            let timeout_ms = self.config.connection_timeout_ms;
954            let send_result = self.block_on(async {
955                timeout(
956                    std::time::Duration::from_millis(timeout_ms),
957                    socket.send(message),
958                )
959                .await
960            });
961            match send_result {
962                Ok(Ok(())) => {
963                    self.sensory_connected.store(true, Ordering::Relaxed);
964                    debug!("[CLIENT] Sensory socket connected (first send)");
965                    return Ok(true);
966                }
967                Ok(Err(ZmqError::BufferFull(_))) => return Ok(false),
968                Ok(Err(e)) => {
969                    let message = e.to_string();
970                    if message.contains("Not connected to peers") {
971                        return Ok(false);
972                    }
973                    return Err(SdkError::Zmq(e));
974                }
975                Err(_) => return Ok(false),
976            }
977        }
978
979        let send_result = self.block_on(async { socket.send(message).now_or_never() });
980        match send_result {
981            Some(Ok(())) => {
982                debug!("[CLIENT] Sent {} bytes sensory (raw)", bytes.len());
983                Ok(true)
984            }
985            None | Some(Err(ZmqError::BufferFull(_))) => {
986                // REAL-TIME: Drop on pressure (do not block and do not buffer history)
987                static DROPPED: AtomicU64 = AtomicU64::new(0);
988                static LAST_LOG_MS: AtomicU64 = AtomicU64::new(0);
989
990                let dropped = DROPPED.fetch_add(1, Ordering::Relaxed) + 1;
991                let now_ms = SystemTime::now()
992                    .duration_since(SystemTime::UNIX_EPOCH)
993                    .unwrap_or_default()
994                    .as_millis() as u64;
995
996                let last_ms = LAST_LOG_MS.load(Ordering::Relaxed);
997                // Rate-limit warnings (max once per 5s) to avoid log spam on sustained pressure.
998                if now_ms.saturating_sub(last_ms) >= 5_000
999                    && LAST_LOG_MS
1000                        .compare_exchange(last_ms, now_ms, Ordering::Relaxed, Ordering::Relaxed)
1001                        .is_ok()
1002                {
1003                    warn!(
1004                        "[CLIENT] Sensory backpressure: dropped_messages={} last_payload_bytes={}",
1005                        dropped,
1006                        bytes.len()
1007                    );
1008                }
1009
1010                Ok(false)
1011            }
1012            Some(Err(e)) => {
1013                let message = e.to_string();
1014                if message.contains("Not connected to peers") {
1015                    static DROPPED: AtomicU64 = AtomicU64::new(0);
1016                    static LAST_LOG_MS: AtomicU64 = AtomicU64::new(0);
1017
1018                    let dropped = DROPPED.fetch_add(1, Ordering::Relaxed) + 1;
1019                    let now_ms = SystemTime::now()
1020                        .duration_since(SystemTime::UNIX_EPOCH)
1021                        .unwrap_or_default()
1022                        .as_millis() as u64;
1023
1024                    let last_ms = LAST_LOG_MS.load(Ordering::Relaxed);
1025                    if now_ms.saturating_sub(last_ms) >= 5_000
1026                        && LAST_LOG_MS
1027                            .compare_exchange(last_ms, now_ms, Ordering::Relaxed, Ordering::Relaxed)
1028                            .is_ok()
1029                    {
1030                        warn!(
1031                            "[CLIENT] Sensory not connected: dropped_messages={} last_payload_bytes={}",
1032                            dropped,
1033                            bytes.len()
1034                        );
1035                    }
1036
1037                    return Ok(false);
1038                }
1039
1040                Err(SdkError::Zmq(e))
1041            }
1042        }
1043    }
1044
1045    /// Receive motor data from FEAGI (non-blocking)
1046    ///
1047    /// Returns None if no data is available.
1048    /// Motor data is in binary XYZP format (CorticalMappedXYZPNeuronVoxels).
1049    ///
1050    /// # Example
1051    /// ```ignore
1052    /// use feagi_structures::neuron_voxels::xyzp::CorticalMappedXYZPNeuronVoxels;
1053    ///
1054    /// if let Some(motor_data) = client.receive_motor_data()? {
1055    ///     // Process binary motor data
1056    ///     for (cortical_id, neurons) in motor_data.iter() {
1057    ///         println!("Motor area {:?}: {} neurons", cortical_id, neurons.len());
1058    ///     }
1059    /// }
1060    /// ```
1061    pub fn receive_motor_data(
1062        &self,
1063    ) -> Result<Option<feagi_structures::neuron_voxels::xyzp::CorticalMappedXYZPNeuronVoxels>> {
1064        use feagi_structures::neuron_voxels::xyzp::CorticalMappedXYZPNeuronVoxels;
1065
1066        if !self.registered {
1067            return Err(SdkError::NotRegistered);
1068        }
1069
1070        let socket = self.motor_socket.as_ref().ok_or_else(|| {
1071            error!("[CLIENT] receive_motor_data() called but motor_socket is None");
1072            SdkError::Other("Motor socket not initialized (not a motor agent?)".to_string())
1073        })?;
1074
1075        let mut socket = socket
1076            .lock()
1077            .map_err(|e| SdkError::ThreadError(format!("Failed to lock socket: {}", e)))?;
1078        let recv_result = self.block_on(async { socket.recv().now_or_never() });
1079        let message = match recv_result {
1080            None => return Ok(None),
1081            Some(Ok(message)) => message,
1082            Some(Err(e)) => return Err(SdkError::Zmq(e)),
1083        };
1084
1085        // Non-blocking receive:
1086        // - preferred multipart: [topic, data]
1087        // - legacy single-part: [data]
1088        let mut frames = message.into_vec();
1089        if frames.is_empty() {
1090            return Ok(None);
1091        }
1092        let (_topic_opt, data) = if frames.len() == 2 {
1093            let topic = frames.remove(0).to_vec();
1094            trace!(
1095                "[CLIENT] Motor multipart topic: '{}'",
1096                String::from_utf8_lossy(&topic)
1097            );
1098            let data = frames.remove(0).to_vec();
1099            trace!("[CLIENT] Received motor data frame: {} bytes", data.len());
1100            (Some(topic), data)
1101        } else if frames.len() == 1 {
1102            (None, frames.remove(0).to_vec())
1103        } else {
1104            return Err(SdkError::Other(
1105                "Unexpected multipart motor payload".to_string(),
1106            ));
1107        };
1108
1109        // Do not filter by topic here.
1110        //
1111        // FEAGI publishers have historically used different topic conventions
1112        // (agent_id, empty topic, or other routing keys). Since we subscribe to all topics,
1113        // the safest approach is to accept the motor payload regardless of topic and let
1114        // higher layers decide what to do with it.
1115
1116        // ARCHITECTURE COMPLIANCE: Deserialize binary XYZP motor data using FeagiByteContainer
1117        let mut byte_container = feagi_serialization::FeagiByteContainer::new_empty();
1118        let mut data_vec = data.to_vec();
1119
1120        // Load bytes into container
1121        byte_container
1122            .try_write_data_to_container_and_verify(&mut |bytes| {
1123                std::mem::swap(bytes, &mut data_vec);
1124                Ok(())
1125            })
1126            .map_err(|e| SdkError::Other(format!("Failed to load motor data bytes: {:?}", e)))?;
1127
1128        // Get number of structures (should be 1 for motor data)
1129        let num_structures = byte_container
1130            .try_get_number_contained_structures()
1131            .map_err(|e| SdkError::Other(format!("Failed to get structure count: {:?}", e)))?;
1132
1133        if num_structures == 0 {
1134            return Ok(None);
1135        }
1136
1137        // Extract first structure
1138        let boxed_struct = byte_container
1139            .try_create_new_struct_from_index(0)
1140            .map_err(|e| SdkError::Other(format!("Failed to extract motor structure: {:?}", e)))?;
1141
1142        // Downcast to CorticalMappedXYZPNeuronVoxels
1143        let motor_data = boxed_struct
1144            .as_any()
1145            .downcast_ref::<CorticalMappedXYZPNeuronVoxels>()
1146            .ok_or_else(|| {
1147                SdkError::Other("Motor data is not CorticalMappedXYZPNeuronVoxels".to_string())
1148            })?
1149            .clone();
1150
1151        debug!(
1152            "[CLIENT] ✓ Received motor data ({} bytes, {} areas)",
1153            data.len(),
1154            motor_data.len()
1155        );
1156        Ok(Some(motor_data))
1157    }
1158
1159    /// Receive visualization data from FEAGI (non-blocking)
1160    ///
1161    /// Returns None if no data is available.
1162    ///
1163    /// # Example
1164    /// ```ignore
1165    /// if let Some(viz_data) = client.receive_visualization_data()? {
1166    ///     // Process neural activity data
1167    ///     println!("Visualization data size: {} bytes", viz_data.len());
1168    /// }
1169    /// ```
1170    pub fn receive_visualization_data(&self) -> Result<Option<Vec<u8>>> {
1171        if !self.registered {
1172            return Err(SdkError::NotRegistered);
1173        }
1174
1175        let socket = self.viz_socket.as_ref().ok_or_else(|| {
1176            SdkError::Other(
1177                "Visualization socket not initialized (not a visualization/infrastructure agent?)"
1178                    .to_string(),
1179            )
1180        })?;
1181
1182        let mut socket = socket
1183            .lock()
1184            .map_err(|e| SdkError::ThreadError(format!("Failed to lock socket: {}", e)))?;
1185        let recv_result = self.block_on(async { socket.recv().now_or_never() });
1186        match recv_result {
1187            None => Ok(None),
1188            Some(Ok(message)) => {
1189                let mut frames = message.into_vec();
1190                if frames.is_empty() {
1191                    return Ok(None);
1192                }
1193                let data = frames.pop().unwrap().to_vec();
1194                debug!(
1195                    "[CLIENT] ✓ Received visualization data ({} bytes)",
1196                    data.len()
1197                );
1198                Ok(Some(data))
1199            }
1200            Some(Err(e)) => Err(SdkError::Zmq(e)),
1201        }
1202    }
1203
1204    /// Make a REST API request to FEAGI over ZMQ
1205    ///
1206    /// # Arguments
1207    /// * `method` - HTTP method (GET, POST, PUT, DELETE)
1208    /// * `route` - API route (e.g., "/v1/system/health_check")
1209    /// * `data` - Optional request body for POST/PUT requests
1210    ///
1211    /// # Example
1212    /// ```ignore
1213    /// // GET request
1214    /// let health = client.control_request("GET", "/v1/system/health_check", None)?;
1215    ///
1216    /// // POST request
1217    /// let data = serde_json::json!({"key": "value"});
1218    /// let response = client.control_request("POST", "/v1/some/endpoint", Some(data))?;
1219    /// ```
1220    pub fn control_request(
1221        &self,
1222        method: &str,
1223        route: &str,
1224        data: Option<serde_json::Value>,
1225    ) -> Result<serde_json::Value> {
1226        if !self.registered {
1227            return Err(SdkError::NotRegistered);
1228        }
1229
1230        let socket = self.control_socket.as_ref().ok_or_else(|| {
1231            SdkError::Other(
1232                "Control socket not initialized (not an infrastructure agent?)".to_string(),
1233            )
1234        })?;
1235
1236        // Prepare REST-over-ZMQ request
1237        let mut request = serde_json::json!({
1238            "method": method,
1239            "route": route,
1240            "headers": {"content-type": "application/json"},
1241        });
1242
1243        if let Some(body) = data {
1244            request["body"] = body;
1245        }
1246
1247        let response = self.send_request(socket, &request)?;
1248        debug!("[CLIENT] ✓ Control request {} {} completed", method, route);
1249        Ok(response)
1250    }
1251
1252    /// Check if agent is registered
1253    pub fn is_registered(&self) -> bool {
1254        self.registered
1255    }
1256
1257    /// Get agent ID
1258    pub fn agent_id(&self) -> &str {
1259        &self.config.agent_id
1260    }
1261}
1262
1263impl Drop for AgentClient {
1264    fn drop(&mut self) {
1265        debug!("[CLIENT] Dropping AgentClient: {}", self.config.agent_id);
1266
1267        // Step 1: Stop heartbeat service first (this stops background threads)
1268        if let Some(mut heartbeat) = self.heartbeat.take() {
1269            debug!("[CLIENT] Stopping heartbeat service before cleanup");
1270            heartbeat.stop();
1271            debug!("[CLIENT] Heartbeat service stopped");
1272        }
1273
1274        // Step 2: Deregister from FEAGI (after threads stopped)
1275        if self.registered {
1276            debug!("[CLIENT] Deregistering agent: {}", self.config.agent_id);
1277            if let Err(e) = self.deregister() {
1278                warn!("[CLIENT] Deregistration failed during drop: {}", e);
1279                // Continue cleanup even if deregistration fails
1280            }
1281        }
1282
1283        // Step 3: Sockets will be dropped automatically
1284        debug!(
1285            "[CLIENT] AgentClient dropped cleanly: {}",
1286            self.config.agent_id
1287        );
1288    }
1289}
1290
1291#[cfg(test)]
1292mod tests {
1293    use super::*;
1294    use feagi_io::AgentType;
1295
1296    #[test]
1297    fn test_client_creation() {
1298        let config = AgentConfig::new("test_agent", AgentType::Sensory)
1299            .with_vision_capability("camera", (640, 480), 3, "i_vision")
1300            .with_registration_endpoint("tcp://localhost:8000")
1301            .with_sensory_endpoint("tcp://localhost:5558");
1302
1303        let client = AgentClient::new(config);
1304        assert!(client.is_ok());
1305
1306        let client = client.unwrap();
1307        assert!(!client.is_registered());
1308        assert_eq!(client.agent_id(), "test_agent");
1309    }
1310
1311    // Note: Full integration tests require a running FEAGI instance
1312    // and should be in separate integration test files
1313}