feagi_agent/sdk/agents/
connector_agent.rs1use crate::sdk::client::communication::auth_request::AuthRequest;
2use crate::sdk::client::communication::registration_request::RegistrationRequest;
3use crate::sdk::common::{
4 AgentCapabilities, AgentConnectionState, AgentDescriptor, AuthToken, FeagiAgent,
5};
6use feagi_io::io_api::implementations::zmq::{
7 FEAGIZMQClientPusherProperties, FEAGIZMQClientSubscriberProperties,
8};
9use feagi_io::io_api::traits_and_enums::client::{
10 client_shared::FeagiClientConnectionStateChange, FeagiClientPusher,
11 FeagiClientRequesterProperties, FeagiClientSubscriber,
12};
13use feagi_io::io_api::traits_and_enums::client::{
14 FeagiClientPusherProperties as _, FeagiClientSubscriberProperties as _,
15};
16use feagi_sensorimotor::caching::{MotorDeviceCache, SensorDeviceCache};
17use feagi_sensorimotor::feedbacks::{FeedBackRegistration, FeedbackRegistrationTargets};
18use feagi_sensorimotor::ConnectorCache;
19use feagi_structures::FeagiDataError;
20use std::collections::HashMap;
21use std::sync::{Arc, Mutex, MutexGuard};
22use std::time::Duration;
23
24pub struct ConnectorAgent {
25 agent_id: AgentDescriptor,
26 current_connection_state: AgentConnectionState,
27 connector_cache: ConnectorCache,
28 sensor_sender: Option<Box<dyn FeagiClientPusher>>,
29 motor_receiver: Option<Box<dyn FeagiClientSubscriber>>,
30}
31
32impl ConnectorAgent {
33 pub fn new_empty(agent_id: AgentDescriptor) -> Self {
34 ConnectorAgent {
35 agent_id,
36 current_connection_state: AgentConnectionState::Disconnected,
37 connector_cache: ConnectorCache::new(),
38 sensor_sender: None,
39 motor_receiver: None,
40 }
41 }
42
43 pub fn new_from_device_registration_json(
44 agent_id: AgentDescriptor,
45 json: serde_json::Value,
46 ) -> Result<Self, FeagiDataError> {
47 let mut agent = Self::new_empty(agent_id);
48 agent.set_device_registrations_from_json(json)?;
49 Ok(agent)
50 }
51
52 pub fn get_sensor_cache(&self) -> MutexGuard<'_, SensorDeviceCache> {
53 self.connector_cache.get_sensor_cache()
54 }
55
56 pub fn get_sensor_cache_ref(&self) -> Arc<Mutex<SensorDeviceCache>> {
57 self.connector_cache.get_sensor_cache_ref()
58 }
59
60 pub fn get_motor_cache(&self) -> MutexGuard<'_, MotorDeviceCache> {
61 self.connector_cache.get_motor_cache()
62 }
63
64 pub fn get_motor_cache_ref(&self) -> Arc<Mutex<MotorDeviceCache>> {
65 self.connector_cache.get_motor_cache_ref()
66 }
67
68 pub fn get_device_registration_json(&self) -> Result<serde_json::Value, FeagiDataError> {
69 self.connector_cache
70 .export_device_registrations_as_config_json()
71 }
72
73 pub fn register_feedback(
74 &mut self,
75 feedback: FeedBackRegistration,
76 target: FeedbackRegistrationTargets,
77 ) -> Result<(), FeagiDataError> {
78 self.connector_cache.register_feedback(feedback, target)
79 }
80
81 pub fn set_device_registrations_from_json(
82 &mut self,
83 json: serde_json::Value,
84 ) -> Result<(), FeagiDataError> {
85 if self.current_connection_state().is_active() {
86 return Err(FeagiDataError::ResourceLockedWhileRunning(
87 "Cannot reload device registrations while running!".to_string(),
88 ));
89 }
90 self.connector_cache
91 .import_device_registrations_as_config_json(json)
92 }
93
94 pub fn send_sensor_data(&self) -> Result<(), FeagiDataError> {
95 if !self.current_connection_state().is_active() {
96 return Err(FeagiDataError::ResourceLockedWhileRunning(
97 "Cannot reload device registrations while running!".to_string(),
98 ));
99 }
100 if self.sensor_sender.is_none() {
101 return Err(FeagiDataError::ResourceLockedWhileRunning(
102 "Cannot reload device registrations while running!".to_string(),
103 ));
104 }
105 let Some(sender) = self.sensor_sender.as_ref() else {
106 return Err(FeagiDataError::ResourceLockedWhileRunning(
107 "Cannot reload device registrations while running!".to_string(),
108 ));
109 };
110
111 let mut sensor_cache = self.get_sensor_cache();
112 let _ = sensor_cache.encode_neurons_to_bytes();
113 let bytes = sensor_cache.get_feagi_byte_container();
114 sender.push_data(bytes.get_byte_ref());
115 Ok(())
116 }
117}
118
119impl FeagiAgent for ConnectorAgent {
120 fn agent_id(&self) -> &AgentDescriptor {
121 &self.agent_id
122 }
123
124 fn current_connection_state(&self) -> &AgentConnectionState {
125 &self.current_connection_state
126 }
127
128 fn agent_capabilities(&self) -> &[AgentCapabilities] {
129 &[
130 AgentCapabilities::SendSensorData,
131 AgentCapabilities::ReceiveMotorData,
132 ]
133 }
134
135 fn connect_to_feagi(
136 &mut self,
137 host: String,
138 requester_properties: Box<dyn FeagiClientRequesterProperties>,
139 agent_descriptor: AgentDescriptor,
140 ) -> Result<(), FeagiDataError> {
141 if self.current_connection_state().is_active() {
142 return Err(FeagiDataError::ResourceLockedWhileRunning(
143 "Cannot try to connect to FEAGI while a connection is active!"
144 .parse()
145 .unwrap(),
146 ));
147 }
148
149 self.current_connection_state = AgentConnectionState::Connecting;
150
151 let mut requester =
152 requester_properties.build(Box::new(|_change: FeagiClientConnectionStateChange| {
153 }));
155
156 requester.connect(&host).map_err(|e| {
157 FeagiDataError::InternalError(format!("Failed to connect requester: {}", e))
158 })?;
159
160 self.current_connection_state = AgentConnectionState::Authenticating;
161
162 let auth_request = AuthRequest::new(
163 &agent_descriptor,
164 &AuthToken::new([0; 32]), );
166 let auth_bytes = serde_json::to_vec(&auth_request.to_json())
167 .map_err(|e| FeagiDataError::SerializationError(e.to_string()))?;
168 requester.send_request(&auth_bytes).map_err(|e| {
169 FeagiDataError::InternalError(format!("Failed to send auth request: {}", e))
170 })?;
171
172 let phase1_data = loop {
173 if let Some(data) = requester.try_poll_receive().map_err(|e| {
174 FeagiDataError::InternalError(format!("Failed to read phase 1 response: {}", e))
175 })? {
176 break data.to_vec();
177 }
178 std::thread::sleep(Duration::from_millis(1));
179 };
180
181 let phase1_json: serde_json::Value = serde_json::from_slice(&phase1_data)
182 .map_err(|e| FeagiDataError::DeserializationError(e.to_string()))?;
183 let connection_id = phase1_json
184 .get("connection_id")
185 .and_then(|v| v.as_str())
186 .ok_or_else(|| {
187 FeagiDataError::DeserializationError(
188 "Missing connection_id in phase 1 response".to_string(),
189 )
190 })?
191 .to_string();
192
193 let registration_request = RegistrationRequest {
194 connection_id,
195 data: serde_json::json!({}),
196 capabilities: self.agent_capabilities().to_vec(),
197 };
198 let registration_bytes = serde_json::to_vec(®istration_request)
199 .map_err(|e| FeagiDataError::SerializationError(e.to_string()))?;
200 requester.send_request(®istration_bytes).map_err(|e| {
201 FeagiDataError::InternalError(format!("Failed to send registration request: {}", e))
202 })?;
203
204 let phase2_data = loop {
205 if let Some(data) = requester.try_poll_receive().map_err(|e| {
206 FeagiDataError::InternalError(format!("Failed to read phase 2 response: {}", e))
207 })? {
208 break data.to_vec();
209 }
210 std::thread::sleep(Duration::from_millis(1));
211 };
212
213 let phase2_json: serde_json::Value = serde_json::from_slice(&phase2_data)
214 .map_err(|e| FeagiDataError::DeserializationError(e.to_string()))?;
215 let endpoints = Self::parse_phase2_endpoints(&phase2_json)?;
216
217 let capabilities = self.agent_capabilities().to_vec();
218 for capability in capabilities {
219 let endpoint = endpoints.get(&capability).ok_or_else(|| {
220 FeagiDataError::DeserializationError(format!(
221 "Missing endpoint for capability {:?}",
222 capability
223 ))
224 })?;
225
226 match capability {
227 AgentCapabilities::SendSensorData => {
228 let mut sensor_sender =
229 Box::new(FEAGIZMQClientPusherProperties::new(endpoint.to_string())).build(
230 Box::new(|_change: FeagiClientConnectionStateChange| {
231 }),
233 );
234 sensor_sender.connect(endpoint).map_err(|e| {
235 FeagiDataError::InternalError(format!(
236 "Failed to connect sensor sender: {}",
237 e
238 ))
239 })?;
240 self.sensor_sender = Some(sensor_sender);
241 }
242 AgentCapabilities::ReceiveMotorData => {
243 let mut motor_receiver = Box::new(FEAGIZMQClientSubscriberProperties::new(
244 endpoint.to_string(),
245 ))
246 .build(Box::new(
247 |_change: FeagiClientConnectionStateChange| {
248 },
250 ));
251 motor_receiver.connect(endpoint).map_err(|e| {
252 FeagiDataError::InternalError(format!(
253 "Failed to connect motor receiver: {}",
254 e
255 ))
256 })?;
257 self.motor_receiver = Some(motor_receiver);
258 }
259 AgentCapabilities::ReceiveNeuronVisualizations => {
260 }
262 }
263 }
264
265 self.current_connection_state = AgentConnectionState::Running;
266 Ok(())
267 }
268
269 fn disconnect(&mut self) {
270 self.current_connection_state = AgentConnectionState::Disconnected;
272 }
273}
274
275impl ConnectorAgent {
276 fn parse_phase2_endpoints(
277 phase2_json: &serde_json::Value,
278 ) -> Result<HashMap<AgentCapabilities, String>, FeagiDataError> {
279 let endpoints = phase2_json
280 .get("endpoints")
281 .and_then(|v| v.as_object())
282 .ok_or_else(|| {
283 FeagiDataError::DeserializationError("Missing or invalid endpoints".to_string())
284 })?;
285
286 let mut parsed = HashMap::new();
287 for (key, value) in endpoints {
288 let endpoint = value.as_str().ok_or_else(|| {
289 FeagiDataError::DeserializationError(format!(
290 "Endpoint for {} must be a string",
291 key
292 ))
293 })?;
294 let capability = match key.as_str() {
295 "send_sensor_data" => AgentCapabilities::SendSensorData,
296 "receive_motor_data" => AgentCapabilities::ReceiveMotorData,
297 "receive_neuron_visualizations" => AgentCapabilities::ReceiveNeuronVisualizations,
298 _ => {
299 return Err(FeagiDataError::DeserializationError(format!(
300 "Unknown capability key: {}",
301 key
302 )));
303 }
304 };
305 parsed.insert(capability, endpoint.to_string());
306 }
307
308 Ok(parsed)
309 }
310}