1use crate::config::AgentConfig;
7use crate::error::{Result, SdkError};
8use crate::heartbeat::HeartbeatService;
9use crate::reconnect::{retry_with_backoff, ReconnectionStrategy};
10use feagi_io::AgentType;
11use std::sync::{Arc, Mutex};
12use tracing::{debug, error, info, warn};
13
14pub struct AgentClient {
40 config: AgentConfig,
42
43 context: zmq::Context,
45
46 registration_socket: Option<Arc<Mutex<zmq::Socket>>>,
48
49 sensory_socket: Option<zmq::Socket>,
51
52 motor_socket: Option<zmq::Socket>,
54
55 viz_socket: Option<zmq::Socket>,
57
58 control_socket: Option<zmq::Socket>,
60
61 heartbeat: Option<HeartbeatService>,
63
64 registered: bool,
66}
67
68impl AgentClient {
69 pub fn new(config: AgentConfig) -> Result<Self> {
74 config.validate()?;
76
77 let context = zmq::Context::new();
78
79 Ok(Self {
80 config,
81 context,
82 registration_socket: None,
83 sensory_socket: None,
84 motor_socket: None,
85 viz_socket: None,
86 control_socket: None,
87 heartbeat: None,
88 registered: false,
89 })
90 }
91
92 pub fn connect(&mut self) -> Result<()> {
103 if self.registered {
104 return Err(SdkError::AlreadyConnected);
105 }
106
107 info!(
108 "[CLIENT] Connecting to FEAGI: {}",
109 self.config.registration_endpoint
110 );
111
112 let mut socket_strategy = ReconnectionStrategy::new(
114 self.config.retry_backoff_ms,
115 self.config.registration_retries,
116 );
117 retry_with_backoff(
118 || self.create_sockets(),
119 &mut socket_strategy,
120 "Socket creation",
121 )?;
122
123 let mut reg_strategy = ReconnectionStrategy::new(
125 self.config.retry_backoff_ms,
126 self.config.registration_retries,
127 );
128 retry_with_backoff(|| self.register(), &mut reg_strategy, "Registration")?;
129
130 if self.config.heartbeat_interval > 0.0 {
133 debug!("[CLIENT] Starting heartbeat service (post-registration)");
134 self.start_heartbeat()?;
135 } else {
136 debug!("[CLIENT] Heartbeat disabled (interval = 0)");
137 }
138
139 info!(
140 "[CLIENT] ✓ Connected and registered as: {}",
141 self.config.agent_id
142 );
143 Ok(())
144 }
145
146 fn create_sockets(&mut self) -> Result<()> {
148 let reg_socket = self.context.socket(zmq::REQ)?;
150 reg_socket.set_rcvtimeo(self.config.connection_timeout_ms as i32)?;
151 reg_socket.set_sndtimeo(self.config.connection_timeout_ms as i32)?;
152 reg_socket.connect(&self.config.registration_endpoint)?;
153 self.registration_socket = Some(Arc::new(Mutex::new(reg_socket)));
154
155 let sensory_socket = self.context.socket(zmq::PUSH)?;
157 sensory_socket.set_sndhwm(self.config.sensory_send_hwm)?;
158 sensory_socket.set_linger(self.config.sensory_linger_ms)?;
159 sensory_socket.set_immediate(self.config.sensory_immediate)?;
160 sensory_socket.connect(&self.config.sensory_endpoint)?;
161 self.sensory_socket = Some(sensory_socket);
162
163 if matches!(self.config.agent_type, AgentType::Motor | AgentType::Both) {
165 info!(
166 "[SDK-CONNECT] 🎮 Initializing motor socket for agent '{}' (type: {:?})",
167 self.config.agent_id, self.config.agent_type
168 );
169 info!(
170 "[SDK-CONNECT] 🎮 Motor endpoint: {}",
171 self.config.motor_endpoint
172 );
173
174 let motor_socket = self.context.socket(zmq::SUB)?;
175 motor_socket.connect(&self.config.motor_endpoint)?;
176 info!("[SDK-CONNECT] ✅ Motor socket connected");
177
178 info!(
180 "[SDK-CONNECT] 🎮 Subscribing to topic: '{}'",
181 String::from_utf8_lossy(self.config.agent_id.as_bytes())
182 );
183 motor_socket.set_subscribe(self.config.agent_id.as_bytes())?;
184 info!("[SDK-CONNECT] ✅ Motor subscription set");
185
186 self.motor_socket = Some(motor_socket);
187 info!("[SDK-CONNECT] ✅ Motor socket initialized successfully");
188 } else {
189 info!(
190 "[SDK-CONNECT] ⚠️ Motor socket NOT initialized (agent type: {:?})",
191 self.config.agent_type
192 );
193 }
194
195 if matches!(
197 self.config.agent_type,
198 AgentType::Visualization | AgentType::Infrastructure
199 ) {
200 let viz_socket = self.context.socket(zmq::SUB)?;
201 viz_socket.connect(&self.config.visualization_endpoint)?;
202
203 viz_socket.set_subscribe(b"")?;
205 self.viz_socket = Some(viz_socket);
206 debug!("[CLIENT] ✓ Visualization socket created");
207 }
208
209 if matches!(self.config.agent_type, AgentType::Infrastructure) {
211 let control_socket = self.context.socket(zmq::REQ)?;
212 control_socket.set_rcvtimeo(self.config.connection_timeout_ms as i32)?;
213 control_socket.set_sndtimeo(self.config.connection_timeout_ms as i32)?;
214 control_socket.connect(&self.config.control_endpoint)?;
215 self.control_socket = Some(control_socket);
216 debug!("[CLIENT] ✓ Control/API socket created");
217 }
218
219 debug!("[CLIENT] ✓ ZMQ sockets created");
220 Ok(())
221 }
222
223 fn register(&mut self) -> Result<()> {
225 let registration_msg = serde_json::json!({
226 "method": "POST",
227 "path": "/v1/agent/register",
228 "body": {
229 "agent_id": self.config.agent_id,
230 "agent_type": match self.config.agent_type {
231 AgentType::Sensory => "sensory",
232 AgentType::Motor => "motor",
233 AgentType::Both => "both",
234 AgentType::Visualization => "visualization",
235 AgentType::Infrastructure => "infrastructure",
236 },
237 "capabilities": self.config.capabilities,
238 }
239 });
240
241 let socket = self
242 .registration_socket
243 .as_ref()
244 .ok_or_else(|| SdkError::Other("Registration socket not initialized".to_string()))?;
245
246 let response = {
248 let socket = socket
249 .lock()
250 .map_err(|e| SdkError::ThreadError(format!("Failed to lock socket: {}", e)))?;
251
252 debug!(
253 "[CLIENT] Sending registration request for: {}",
254 self.config.agent_id
255 );
256 socket.send(registration_msg.to_string().as_bytes(), 0)?;
257
258 let response_bytes = socket.recv_bytes(0)?;
260 serde_json::from_slice::<serde_json::Value>(&response_bytes)?
261 }; let status_code = response
265 .get("status")
266 .and_then(|s| s.as_u64())
267 .unwrap_or(500);
268 if status_code == 200 {
269 self.registered = true;
270 info!("[CLIENT] ✓ Registration successful: {:?}", response);
271 Ok(())
272 } else {
273 let empty_body = serde_json::json!({});
274 let body = response.get("body").unwrap_or(&empty_body);
275 let message = body
276 .get("error")
277 .and_then(|m| m.as_str())
278 .unwrap_or("Unknown error");
279
280 if message.contains("already registered") {
282 warn!("[CLIENT] ⚠ Agent already registered - attempting deregistration first");
283 self.deregister()?;
284 Err(SdkError::RegistrationFailed(
285 "Retry after deregistration".to_string(),
286 ))
287 } else {
288 error!("[CLIENT] ✗ Registration failed: {}", message);
289 Err(SdkError::RegistrationFailed(message.to_string()))
290 }
291 }
292 }
293
294 fn deregister(&mut self) -> Result<()> {
296 if !self.registered && self.registration_socket.is_none() {
297 return Ok(()); }
299
300 info!("[CLIENT] Deregistering agent: {}", self.config.agent_id);
301
302 let deregistration_msg = serde_json::json!({
303 "method": "DELETE",
304 "path": "/v1/agent/deregister",
305 "body": {
306 "agent_id": self.config.agent_id,
307 }
308 });
309
310 if let Some(socket) = &self.registration_socket {
311 let socket = socket
312 .lock()
313 .map_err(|e| SdkError::ThreadError(format!("Failed to lock socket: {}", e)))?;
314
315 if let Err(e) = socket.send(deregistration_msg.to_string().as_bytes(), 0) {
317 warn!("[CLIENT] ⚠ Failed to send deregistration: {}", e);
318 return Ok(()); }
320
321 match socket.recv_bytes(0) {
323 Ok(response_bytes) => {
324 let response: serde_json::Value = serde_json::from_slice(&response_bytes)?;
325 if response.get("status").and_then(|s| s.as_str()) == Some("success") {
326 info!("[CLIENT] ✓ Deregistration successful");
327 } else {
328 warn!("[CLIENT] ⚠ Deregistration returned: {:?}", response);
329 }
330 }
331 Err(e) => {
332 warn!("[CLIENT] ⚠ Deregistration timeout/error: {}", e);
333 }
334 }
335 }
336
337 self.registered = false;
338 Ok(())
339 }
340
341 fn start_heartbeat(&mut self) -> Result<()> {
343 if self.heartbeat.is_some() {
344 return Ok(());
345 }
346
347 let socket = self
348 .registration_socket
349 .as_ref()
350 .ok_or_else(|| SdkError::Other("Registration socket not initialized".to_string()))?;
351
352 let mut heartbeat = HeartbeatService::new(
353 self.config.agent_id.clone(),
354 Arc::clone(socket),
355 self.config.heartbeat_interval,
356 );
357
358 heartbeat.start()?;
359 self.heartbeat = Some(heartbeat);
360
361 debug!(
362 "[CLIENT] ✓ Heartbeat service started (interval: {}s)",
363 self.config.heartbeat_interval
364 );
365 Ok(())
366 }
367
368 pub fn send_sensory_data(&self, neuron_pairs: Vec<(i32, f64)>) -> Result<()> {
382 if !self.registered {
383 return Err(SdkError::NotRegistered);
384 }
385
386 let socket = self
387 .sensory_socket
388 .as_ref()
389 .ok_or_else(|| SdkError::Other("Sensory socket not initialized".to_string()))?;
390
391 use feagi_structures::genomic::cortical_area::CorticalID;
394 use feagi_structures::neuron_voxels::xyzp::{
395 CorticalMappedXYZPNeuronVoxels, NeuronVoxelXYZPArrays,
396 };
397
398 let vision_cap = self
400 .config
401 .capabilities
402 .vision
403 .as_ref()
404 .ok_or_else(|| SdkError::Other("No vision capability configured".to_string()))?;
405
406 let (width, _height) = vision_cap.dimensions;
407 let cortical_area = &vision_cap.target_cortical_area;
408
409 let mut bytes = [b' '; 8];
411 let name_bytes = cortical_area.as_bytes();
412 let copy_len = name_bytes.len().min(8);
413 bytes[..copy_len].copy_from_slice(&name_bytes[..copy_len]);
414 let cortical_id = CorticalID::try_from_bytes(&bytes).map_err(|e| {
415 SdkError::Other(format!("Invalid cortical ID '{}': {:?}", cortical_area, e))
416 })?;
417
418 let mut x_coords = Vec::with_capacity(neuron_pairs.len());
420 let mut y_coords = Vec::with_capacity(neuron_pairs.len());
421 let mut z_coords = Vec::with_capacity(neuron_pairs.len());
422 let mut potentials = Vec::with_capacity(neuron_pairs.len());
423
424 for (neuron_id, potential) in neuron_pairs {
425 let neuron_id = neuron_id as u32;
426 x_coords.push(neuron_id % (width as u32));
427 y_coords.push(neuron_id / (width as u32));
428 z_coords.push(0); potentials.push(potential as f32);
430 }
431
432 let _neuron_count = x_coords.len(); let neuron_arrays =
436 NeuronVoxelXYZPArrays::new_from_vectors(x_coords, y_coords, z_coords, potentials)
437 .map_err(|e| SdkError::Other(format!("Failed to create neuron arrays: {:?}", e)))?;
438
439 let mut cortical_mapped = CorticalMappedXYZPNeuronVoxels::new();
441 cortical_mapped.insert(cortical_id, neuron_arrays);
442
443 let mut byte_container = feagi_serialization::FeagiByteContainer::new_empty();
445 byte_container
446 .overwrite_byte_data_with_single_struct_data(&cortical_mapped, 0)
447 .map_err(|e| SdkError::Other(format!("Failed to serialize to container: {:?}", e)))?;
448
449 let buffer = byte_container.get_byte_ref().to_vec();
450
451 socket.send(&buffer, 0)?;
453
454 debug!(
455 "[CLIENT] Sent {} bytes XYZP binary to {}",
456 buffer.len(),
457 cortical_area
458 );
459 Ok(())
460 }
461
462 pub fn receive_motor_data(
479 &self,
480 ) -> Result<Option<feagi_structures::neuron_voxels::xyzp::CorticalMappedXYZPNeuronVoxels>> {
481 use feagi_structures::neuron_voxels::xyzp::CorticalMappedXYZPNeuronVoxels;
482
483 if !self.registered {
484 return Err(SdkError::NotRegistered);
485 }
486
487 let socket = self.motor_socket.as_ref().ok_or_else(|| {
488 info!("[CLIENT] ❌ receive_motor_data() called but motor_socket is None!");
489 SdkError::Other("Motor socket not initialized (not a motor agent?)".to_string())
490 })?;
491
492 match socket.recv_bytes(zmq::DONTWAIT) {
495 Ok(topic) => {
496 info!(
497 "[CLIENT] 📥 Received first frame: {} bytes: '{}'",
498 topic.len(),
499 String::from_utf8_lossy(&topic)
500 );
501
502 if topic != self.config.agent_id.as_bytes() {
504 info!(
505 "[CLIENT] ⚠️ Received motor data for different agent: expected '{}', got '{}'",
506 self.config.agent_id,
507 String::from_utf8_lossy(&topic)
508 );
509 return Ok(None);
510 }
511
512 let data = if socket.get_rcvmore().map_err(SdkError::Zmq)? {
514 info!(
515 "[CLIENT] 📥 More frames available, receiving second frame (motor data)..."
516 );
517 let data = socket.recv_bytes(0).map_err(|e| {
519 info!("[CLIENT] ❌ Failed to receive second frame: {}", e);
520 SdkError::Zmq(e)
521 })?;
522 info!(
523 "[CLIENT] 📥 Received motor data frame: {} bytes",
524 data.len()
525 );
526 data
527 } else {
528 info!("[CLIENT] ⚠️ NO MORE FRAMES! Old FEAGI (single-part message)");
529 info!(
530 "[CLIENT] 📥 Using first frame as motor data ({} bytes)",
531 topic.len()
532 );
533 topic
535 };
536
537 let mut byte_container = feagi_serialization::FeagiByteContainer::new_empty();
539 let mut data_vec = data.to_vec();
540
541 byte_container
543 .try_write_data_to_container_and_verify(&mut |bytes| {
544 std::mem::swap(bytes, &mut data_vec);
545 Ok(())
546 })
547 .map_err(|e| {
548 SdkError::Other(format!("Failed to load motor data bytes: {:?}", e))
549 })?;
550
551 let num_structures = byte_container
553 .try_get_number_contained_structures()
554 .map_err(|e| {
555 SdkError::Other(format!("Failed to get structure count: {:?}", e))
556 })?;
557
558 if num_structures == 0 {
559 return Ok(None);
560 }
561
562 let boxed_struct =
564 byte_container
565 .try_create_new_struct_from_index(0)
566 .map_err(|e| {
567 SdkError::Other(format!("Failed to extract motor structure: {:?}", e))
568 })?;
569
570 let motor_data = boxed_struct
572 .as_any()
573 .downcast_ref::<CorticalMappedXYZPNeuronVoxels>()
574 .ok_or_else(|| {
575 SdkError::Other(
576 "Motor data is not CorticalMappedXYZPNeuronVoxels".to_string(),
577 )
578 })?
579 .clone();
580
581 debug!(
582 "[CLIENT] ✓ Received motor data ({} bytes, {} areas)",
583 data.len(),
584 motor_data.len()
585 );
586 Ok(Some(motor_data))
587 }
588 Err(zmq::Error::EAGAIN) => {
589 Ok(None)
591 }
592 Err(e) => {
593 error!("[CLIENT] ❌ ZMQ error on motor receive: {}", e);
594 Err(SdkError::Zmq(e))
595 }
596 }
597 }
598
599 pub fn receive_visualization_data(&self) -> Result<Option<Vec<u8>>> {
611 if !self.registered {
612 return Err(SdkError::NotRegistered);
613 }
614
615 let socket = self.viz_socket.as_ref().ok_or_else(|| {
616 SdkError::Other(
617 "Visualization socket not initialized (not a visualization/infrastructure agent?)"
618 .to_string(),
619 )
620 })?;
621
622 match socket.recv_bytes(zmq::DONTWAIT) {
624 Ok(data) => {
625 debug!(
626 "[CLIENT] ✓ Received visualization data ({} bytes)",
627 data.len()
628 );
629 Ok(Some(data))
630 }
631 Err(zmq::Error::EAGAIN) => Ok(None), Err(e) => Err(SdkError::Zmq(e)),
633 }
634 }
635
636 pub fn control_request(
653 &self,
654 method: &str,
655 route: &str,
656 data: Option<serde_json::Value>,
657 ) -> Result<serde_json::Value> {
658 if !self.registered {
659 return Err(SdkError::NotRegistered);
660 }
661
662 let socket = self.control_socket.as_ref().ok_or_else(|| {
663 SdkError::Other(
664 "Control socket not initialized (not an infrastructure agent?)".to_string(),
665 )
666 })?;
667
668 let mut request = serde_json::json!({
670 "method": method,
671 "route": route,
672 "headers": {"content-type": "application/json"},
673 });
674
675 if let Some(body) = data {
676 request["body"] = body;
677 }
678
679 socket.send(request.to_string().as_bytes(), 0)?;
681
682 let response_bytes = socket.recv_bytes(0)?;
684 let response: serde_json::Value = serde_json::from_slice(&response_bytes)?;
685
686 debug!("[CLIENT] ✓ Control request {} {} completed", method, route);
687 Ok(response)
688 }
689
690 pub fn is_registered(&self) -> bool {
692 self.registered
693 }
694
695 pub fn agent_id(&self) -> &str {
697 &self.config.agent_id
698 }
699}
700
701impl Drop for AgentClient {
702 fn drop(&mut self) {
703 debug!("[CLIENT] Dropping AgentClient: {}", self.config.agent_id);
704
705 if let Some(mut heartbeat) = self.heartbeat.take() {
707 debug!("[CLIENT] Stopping heartbeat service before cleanup");
708 heartbeat.stop();
709 debug!("[CLIENT] Heartbeat service stopped");
710 }
711
712 if self.registered {
714 debug!("[CLIENT] Deregistering agent: {}", self.config.agent_id);
715 if let Err(e) = self.deregister() {
716 warn!("[CLIENT] Deregistration failed during drop: {}", e);
717 }
719 }
720
721 debug!(
723 "[CLIENT] AgentClient dropped cleanly: {}",
724 self.config.agent_id
725 );
726 }
727}
728
729#[cfg(test)]
730mod tests {
731 use super::*;
732 use feagi_io::AgentType;
733
734 #[test]
735 fn test_client_creation() {
736 let config = AgentConfig::new("test_agent", AgentType::Sensory)
737 .with_vision_capability("camera", (640, 480), 3, "i_vision")
738 .with_registration_endpoint("tcp://localhost:8000")
739 .with_sensory_endpoint("tcp://localhost:5558");
740
741 let client = AgentClient::new(config);
742 assert!(client.is_ok());
743
744 let client = client.unwrap();
745 assert!(!client.is_registered());
746 assert_eq!(client.agent_id(), "test_agent");
747 }
748
749 }