Skip to main content

feagi_agent/sdk/agents/
connector_agent.rs

1use crate::sdk::client::communication::auth_request::AuthRequest;
2use crate::sdk::client::communication::registration_request::RegistrationRequest;
3use crate::sdk::common::{
4    AgentCapabilities, AgentConnectionState, AgentDescriptor, AuthToken, FeagiAgent,
5};
6use feagi_io::io_api::implementations::zmq::{
7    FEAGIZMQClientPusherProperties, FEAGIZMQClientSubscriberProperties,
8};
9use feagi_io::io_api::traits_and_enums::client::{
10    client_shared::FeagiClientConnectionStateChange, FeagiClientPusher,
11    FeagiClientRequesterProperties, FeagiClientSubscriber,
12};
13use feagi_io::io_api::traits_and_enums::client::{
14    FeagiClientPusherProperties as _, FeagiClientSubscriberProperties as _,
15};
16use feagi_sensorimotor::caching::{MotorDeviceCache, SensorDeviceCache};
17use feagi_sensorimotor::feedbacks::{FeedBackRegistration, FeedbackRegistrationTargets};
18use feagi_sensorimotor::ConnectorCache;
19use feagi_structures::FeagiDataError;
20use std::collections::HashMap;
21use std::sync::{Arc, Mutex, MutexGuard};
22use std::time::Duration;
23
24pub struct ConnectorAgent {
25    agent_id: AgentDescriptor,
26    current_connection_state: AgentConnectionState,
27    connector_cache: ConnectorCache,
28    sensor_sender: Option<Box<dyn FeagiClientPusher>>,
29    motor_receiver: Option<Box<dyn FeagiClientSubscriber>>,
30}
31
32impl ConnectorAgent {
33    pub fn new_empty(agent_id: AgentDescriptor) -> Self {
34        ConnectorAgent {
35            agent_id,
36            current_connection_state: AgentConnectionState::Disconnected,
37            connector_cache: ConnectorCache::new(),
38            sensor_sender: None,
39            motor_receiver: None,
40        }
41    }
42
43    pub fn new_from_device_registration_json(
44        agent_id: AgentDescriptor,
45        json: serde_json::Value,
46    ) -> Result<Self, FeagiDataError> {
47        let mut agent = Self::new_empty(agent_id);
48        agent.set_device_registrations_from_json(json)?;
49        Ok(agent)
50    }
51
52    pub fn get_sensor_cache(&self) -> MutexGuard<'_, SensorDeviceCache> {
53        self.connector_cache.get_sensor_cache()
54    }
55
56    pub fn get_sensor_cache_ref(&self) -> Arc<Mutex<SensorDeviceCache>> {
57        self.connector_cache.get_sensor_cache_ref()
58    }
59
60    pub fn get_motor_cache(&self) -> MutexGuard<'_, MotorDeviceCache> {
61        self.connector_cache.get_motor_cache()
62    }
63
64    pub fn get_motor_cache_ref(&self) -> Arc<Mutex<MotorDeviceCache>> {
65        self.connector_cache.get_motor_cache_ref()
66    }
67
68    pub fn get_device_registration_json(&self) -> Result<serde_json::Value, FeagiDataError> {
69        self.connector_cache
70            .export_device_registrations_as_config_json()
71    }
72
73    pub fn register_feedback(
74        &mut self,
75        feedback: FeedBackRegistration,
76        target: FeedbackRegistrationTargets,
77    ) -> Result<(), FeagiDataError> {
78        self.connector_cache.register_feedback(feedback, target)
79    }
80
81    pub fn set_device_registrations_from_json(
82        &mut self,
83        json: serde_json::Value,
84    ) -> Result<(), FeagiDataError> {
85        if self.current_connection_state().is_active() {
86            return Err(FeagiDataError::ResourceLockedWhileRunning(
87                "Cannot reload device registrations while running!".to_string(),
88            ));
89        }
90        self.connector_cache
91            .import_device_registrations_as_config_json(json)
92    }
93
94    pub fn send_sensor_data(&self) -> Result<(), FeagiDataError> {
95        if !self.current_connection_state().is_active() {
96            return Err(FeagiDataError::ResourceLockedWhileRunning(
97                "Cannot reload device registrations while running!".to_string(),
98            ));
99        }
100        if self.sensor_sender.is_none() {
101            return Err(FeagiDataError::ResourceLockedWhileRunning(
102                "Cannot reload device registrations while running!".to_string(),
103            ));
104        }
105        let Some(sender) = self.sensor_sender.as_ref() else {
106            return Err(FeagiDataError::ResourceLockedWhileRunning(
107                "Cannot reload device registrations while running!".to_string(),
108            ));
109        };
110
111        let mut sensor_cache = self.get_sensor_cache();
112        let _ = sensor_cache.encode_neurons_to_bytes();
113        let bytes = sensor_cache.get_feagi_byte_container();
114        sender.push_data(bytes.get_byte_ref());
115        Ok(())
116    }
117}
118
119impl FeagiAgent for ConnectorAgent {
120    fn agent_id(&self) -> &AgentDescriptor {
121        &self.agent_id
122    }
123
124    fn current_connection_state(&self) -> &AgentConnectionState {
125        &self.current_connection_state
126    }
127
128    fn agent_capabilities(&self) -> &[AgentCapabilities] {
129        &[
130            AgentCapabilities::SendSensorData,
131            AgentCapabilities::ReceiveMotorData,
132        ]
133    }
134
135    fn connect_to_feagi(
136        &mut self,
137        host: String,
138        requester_properties: Box<dyn FeagiClientRequesterProperties>,
139        agent_descriptor: AgentDescriptor,
140    ) -> Result<(), FeagiDataError> {
141        if self.current_connection_state().is_active() {
142            return Err(FeagiDataError::ResourceLockedWhileRunning(
143                "Cannot try to connect to FEAGI while a connection is active!"
144                    .parse()
145                    .unwrap(),
146            ));
147        }
148
149        self.current_connection_state = AgentConnectionState::Connecting;
150
151        let mut requester =
152            requester_properties.build(Box::new(|_change: FeagiClientConnectionStateChange| {
153                // TODO: track requester connection state changes
154            }));
155
156        requester.connect(&host).map_err(|e| {
157            FeagiDataError::InternalError(format!("Failed to connect requester: {}", e))
158        })?;
159
160        self.current_connection_state = AgentConnectionState::Authenticating;
161
162        let auth_request = AuthRequest::new(
163            &agent_descriptor,
164            &AuthToken::new([0; 32]), // TODO: actual token
165        );
166        let auth_bytes = serde_json::to_vec(&auth_request.to_json())
167            .map_err(|e| FeagiDataError::SerializationError(e.to_string()))?;
168        requester.send_request(&auth_bytes).map_err(|e| {
169            FeagiDataError::InternalError(format!("Failed to send auth request: {}", e))
170        })?;
171
172        let phase1_data = loop {
173            if let Some(data) = requester.try_poll_receive().map_err(|e| {
174                FeagiDataError::InternalError(format!("Failed to read phase 1 response: {}", e))
175            })? {
176                break data.to_vec();
177            }
178            std::thread::sleep(Duration::from_millis(1));
179        };
180
181        let phase1_json: serde_json::Value = serde_json::from_slice(&phase1_data)
182            .map_err(|e| FeagiDataError::DeserializationError(e.to_string()))?;
183        let connection_id = phase1_json
184            .get("connection_id")
185            .and_then(|v| v.as_str())
186            .ok_or_else(|| {
187                FeagiDataError::DeserializationError(
188                    "Missing connection_id in phase 1 response".to_string(),
189                )
190            })?
191            .to_string();
192
193        let registration_request = RegistrationRequest {
194            connection_id,
195            data: serde_json::json!({}),
196            capabilities: self.agent_capabilities().to_vec(),
197        };
198        let registration_bytes = serde_json::to_vec(&registration_request)
199            .map_err(|e| FeagiDataError::SerializationError(e.to_string()))?;
200        requester.send_request(&registration_bytes).map_err(|e| {
201            FeagiDataError::InternalError(format!("Failed to send registration request: {}", e))
202        })?;
203
204        let phase2_data = loop {
205            if let Some(data) = requester.try_poll_receive().map_err(|e| {
206                FeagiDataError::InternalError(format!("Failed to read phase 2 response: {}", e))
207            })? {
208                break data.to_vec();
209            }
210            std::thread::sleep(Duration::from_millis(1));
211        };
212
213        let phase2_json: serde_json::Value = serde_json::from_slice(&phase2_data)
214            .map_err(|e| FeagiDataError::DeserializationError(e.to_string()))?;
215        let endpoints = Self::parse_phase2_endpoints(&phase2_json)?;
216
217        let capabilities = self.agent_capabilities().to_vec();
218        for capability in capabilities {
219            let endpoint = endpoints.get(&capability).ok_or_else(|| {
220                FeagiDataError::DeserializationError(format!(
221                    "Missing endpoint for capability {:?}",
222                    capability
223                ))
224            })?;
225
226            match capability {
227                AgentCapabilities::SendSensorData => {
228                    let mut sensor_sender =
229                        Box::new(FEAGIZMQClientPusherProperties::new(endpoint.to_string())).build(
230                            Box::new(|_change: FeagiClientConnectionStateChange| {
231                                // TODO: track sensor sender connection state changes
232                            }),
233                        );
234                    sensor_sender.connect(endpoint).map_err(|e| {
235                        FeagiDataError::InternalError(format!(
236                            "Failed to connect sensor sender: {}",
237                            e
238                        ))
239                    })?;
240                    self.sensor_sender = Some(sensor_sender);
241                }
242                AgentCapabilities::ReceiveMotorData => {
243                    let mut motor_receiver = Box::new(FEAGIZMQClientSubscriberProperties::new(
244                        endpoint.to_string(),
245                    ))
246                    .build(Box::new(
247                        |_change: FeagiClientConnectionStateChange| {
248                            // TODO: track motor receiver connection state changes
249                        },
250                    ));
251                    motor_receiver.connect(endpoint).map_err(|e| {
252                        FeagiDataError::InternalError(format!(
253                            "Failed to connect motor receiver: {}",
254                            e
255                        ))
256                    })?;
257                    self.motor_receiver = Some(motor_receiver);
258                }
259                AgentCapabilities::ReceiveNeuronVisualizations => {
260                    // TODO: initialize visualization stream
261                }
262            }
263        }
264
265        self.current_connection_state = AgentConnectionState::Running;
266        Ok(())
267    }
268
269    fn disconnect(&mut self) {
270        // TODO: close client connections and update state
271        self.current_connection_state = AgentConnectionState::Disconnected;
272    }
273}
274
275impl ConnectorAgent {
276    fn parse_phase2_endpoints(
277        phase2_json: &serde_json::Value,
278    ) -> Result<HashMap<AgentCapabilities, String>, FeagiDataError> {
279        let endpoints = phase2_json
280            .get("endpoints")
281            .and_then(|v| v.as_object())
282            .ok_or_else(|| {
283                FeagiDataError::DeserializationError("Missing or invalid endpoints".to_string())
284            })?;
285
286        let mut parsed = HashMap::new();
287        for (key, value) in endpoints {
288            let endpoint = value.as_str().ok_or_else(|| {
289                FeagiDataError::DeserializationError(format!(
290                    "Endpoint for {} must be a string",
291                    key
292                ))
293            })?;
294            let capability = match key.as_str() {
295                "send_sensor_data" => AgentCapabilities::SendSensorData,
296                "receive_motor_data" => AgentCapabilities::ReceiveMotorData,
297                "receive_neuron_visualizations" => AgentCapabilities::ReceiveNeuronVisualizations,
298                _ => {
299                    return Err(FeagiDataError::DeserializationError(format!(
300                        "Unknown capability key: {}",
301                        key
302                    )));
303                }
304            };
305            parsed.insert(capability, endpoint.to_string());
306        }
307
308        Ok(parsed)
309    }
310}