feagi_agent/clients/blocking/
command_control_agent.rs1use crate::command_and_control::agent_registration_message::{
2 AgentRegistrationMessage, DeregistrationRequest, DeregistrationResponse, RegistrationRequest,
3 RegistrationResponse,
4};
5use crate::command_and_control::FeagiMessage;
6use crate::{AgentCapabilities, AgentDescriptor, AuthToken, FeagiAgentError};
7use feagi_io::traits_and_enums::client::{FeagiClientRequester, FeagiClientRequesterProperties};
8use feagi_io::traits_and_enums::shared::{FeagiEndpointState, TransportProtocolEndpoint};
9use feagi_io::AgentID;
10use feagi_serialization::FeagiByteContainer;
11use std::collections::HashMap;
12
13pub struct CommandControlAgent {
14 properties: Box<dyn FeagiClientRequesterProperties>,
15 requester: Option<Box<dyn FeagiClientRequester>>,
16 request_buffer: FeagiByteContainer,
17 send_buffer: FeagiByteContainer,
18 registration_status: AgentRegistrationStatus,
19}
20
21impl CommandControlAgent {
22 pub fn new(endpoint_properties: Box<dyn FeagiClientRequesterProperties>) -> Self {
23 Self {
24 registration_status: AgentRegistrationStatus::NotRegistered,
25 properties: endpoint_properties,
26 requester: None,
27 request_buffer: FeagiByteContainer::new_empty(),
28 send_buffer: FeagiByteContainer::new_empty(),
29 }
30 }
31
32 pub fn registration_status(&self) -> &AgentRegistrationStatus {
34 &self.registration_status
35 }
36
37 pub fn registered_endpoint_target(&mut self) -> TransportProtocolEndpoint {
38 self.properties.get_endpoint_target()
39 }
40 pub fn request_connect(&mut self) -> Result<(), FeagiAgentError> {
45 if self.registration_status != AgentRegistrationStatus::NotRegistered {
46 return Err(FeagiAgentError::ConnectionFailed(
47 "Agent already connected and registered!".to_string(),
48 ));
49 }
50
51 if self.requester.is_none() {
52 self.requester = Some(self.properties.as_boxed_client_requester());
53 }
54
55 let mut requester = self.requester.take().unwrap();
56
57 match requester.poll() {
58 FeagiEndpointState::Inactive => {
59 requester.request_connect()?;
60 self.requester = Some(requester);
61 Ok(())
62 }
63 _ => {
64 self.requester = Some(requester);
65 Err(FeagiAgentError::ConnectionFailed(
66 "Socket is already active!".to_string(),
67 ))
68 }
69 }
70 }
71
72 pub fn request_registration(
73 &mut self,
74 agent_descriptor: AgentDescriptor,
75 auth_token: AuthToken,
76 requested_capabilities: Vec<AgentCapabilities>,
77 ) -> Result<(), FeagiAgentError> {
78 let transport_protocol = if let Some(requester) = &mut self.requester {
79 requester
80 .get_endpoint_target()
81 .as_transport_protocol_implementation()
82 } else {
83 return Err(FeagiAgentError::ConnectionFailed(
84 "Cannot register to endpoint when not connected!".to_string(),
85 ));
86 };
87
88 let request = RegistrationRequest::new(
89 agent_descriptor,
90 auth_token,
91 requested_capabilities,
92 transport_protocol,
93 );
94
95 let request_message = FeagiMessage::AgentRegistration(
96 AgentRegistrationMessage::ClientRequestRegistration(request),
97 );
98
99 self.send_message(request_message, 0)?;
100 Ok(())
101 }
102
103 pub fn request_deregistration(
108 &mut self,
109 reason: Option<String>, ) -> Result<(), FeagiAgentError> {
111 let request = DeregistrationRequest::new(reason);
112 let message = FeagiMessage::AgentRegistration(
113 AgentRegistrationMessage::ClientRequestDeregistration(request),
114 );
115
116 self.send_message(message, 0)?;
117 Ok(())
118 }
119
120 pub fn send_heartbeat(&mut self) -> Result<(), FeagiAgentError> {
125 let heartbeat_message = FeagiMessage::HeartBeat;
126 self.send_message(heartbeat_message, 0)
127 }
128
129 pub fn request_disconnect(&mut self) -> Result<(), FeagiAgentError> {
134 let requester = self.requester.as_mut().ok_or_else(|| {
135 FeagiAgentError::ConnectionFailed("No socket is active to disconnect!".to_string())
136 })?;
137 requester.request_disconnect()?;
138 Ok(())
139 }
140
141 pub fn poll_for_messages(
146 &mut self,
147 ) -> Result<(&FeagiEndpointState, Option<FeagiMessage>), FeagiAgentError> {
148 let maybe_message = {
149 let requester = self.requester.as_mut().ok_or_else(|| {
150 FeagiAgentError::ConnectionFailed("No socket is active to poll!".to_string())
151 })?;
152
153 let state_snapshot = requester.poll().clone();
154 match state_snapshot {
155 FeagiEndpointState::Inactive
156 | FeagiEndpointState::Pending
157 | FeagiEndpointState::ActiveWaiting => Ok(None),
158 FeagiEndpointState::ActiveHasData => {
159 let data = requester.consume_retrieved_response()?;
160 self.request_buffer
161 .try_write_data_by_copy_and_verify(data)?;
162 let feagi_message: FeagiMessage = (&self.request_buffer).try_into()?;
163
164 match &feagi_message {
165 FeagiMessage::HeartBeat => Ok(Some(FeagiMessage::HeartBeat)),
166 FeagiMessage::AgentRegistration(registration_message) => {
167 match registration_message {
168 AgentRegistrationMessage::ClientRequestRegistration(_) => {
169 Err(FeagiAgentError::ConnectionFailed(
170 "Client cannot register agents!".to_string(),
171 ))
172 }
173 AgentRegistrationMessage::ServerRespondsRegistration(
174 registration_response,
175 ) => match registration_response {
176 RegistrationResponse::FailedInvalidRequest => {
177 Err(FeagiAgentError::ConnectionFailed(
178 "Invalid server responses!".to_string(),
179 ))
180 }
181 RegistrationResponse::FailedInvalidAuth => {
182 Err(FeagiAgentError::ConnectionFailed(
183 "Invalid auth token!".to_string(),
184 ))
185 }
186 RegistrationResponse::AlreadyRegistered => {
187 Err(FeagiAgentError::ConnectionFailed(
188 "Client already registered!".to_string(),
189 ))
190 }
191 RegistrationResponse::Success(session_id, endpoints) => {
192 self.registration_status =
193 AgentRegistrationStatus::Registered(
194 *session_id,
195 endpoints.clone(),
196 );
197 Ok(Some(feagi_message))
198 }
199 },
200 AgentRegistrationMessage::ClientRequestDeregistration(_) => {
201 Err(FeagiAgentError::ConnectionFailed(
202 "Client cannot receive deregistration request from server!"
203 .to_string(),
204 ))
205 }
206 AgentRegistrationMessage::ServerRespondsDeregistration(
207 deregistration_response,
208 ) => match deregistration_response {
209 DeregistrationResponse::Success => {
210 requester.request_disconnect()?;
211 self.registration_status =
212 AgentRegistrationStatus::NotRegistered;
213 Ok(Some(feagi_message))
214 }
215 DeregistrationResponse::NotRegistered => {
216 Ok(Some(feagi_message))
217 }
218 },
219 }
220 }
221 _ => Ok(Some(feagi_message)),
222 }
223 }
224 FeagiEndpointState::Errored(_) => {
225 requester.confirm_error_and_close()?;
226 Err(FeagiAgentError::ConnectionFailed(
227 "Error occurred".to_string(),
228 ))
229 }
230 }
231 }?;
232
233 let state = self
234 .requester
235 .as_mut()
236 .ok_or_else(|| {
237 FeagiAgentError::ConnectionFailed("No socket is active to poll!".to_string())
238 })?
239 .poll();
240
241 Ok((state, maybe_message))
242 }
243
244 pub fn send_message(
245 &mut self,
246 message: FeagiMessage,
247 increment_value: u16,
248 ) -> Result<(), FeagiAgentError> {
249 let agent_id = match &self.registration_status {
250 AgentRegistrationStatus::Registered(agent_id, _) => *agent_id,
251 AgentRegistrationStatus::NotRegistered => {
252 match &message {
255 FeagiMessage::AgentRegistration(
256 AgentRegistrationMessage::ClientRequestRegistration(_),
257 ) => AgentID::new_blank(),
258 _ => {
259 return Err(FeagiAgentError::UnableToSendData(
260 "Nonregistered agent cannot send message!".to_string(),
261 ));
262 }
263 }
264 }
265 };
266
267 if let Some(requester) = &mut self.requester {
268 message.serialize_to_byte_container(
269 &mut self.send_buffer,
270 agent_id,
271 increment_value,
272 )?;
273 requester.publish_request(self.send_buffer.get_byte_ref())?;
274 Ok(())
275 } else {
276 panic!("Active state but no socket!!")
278 }
279 }
280
281 }
283
284#[derive(Debug, PartialEq, Clone)]
285pub enum AgentRegistrationStatus {
286 NotRegistered,
287 Registered(
288 AgentID,
289 HashMap<AgentCapabilities, TransportProtocolEndpoint>,
290 ),
291}
292
293#[cfg(test)]
294mod tests {
295 use super::*;
296 use feagi_io::protocol_implementations::zmq::ZmqUrl;
297 use feagi_io::traits_and_enums::shared::{FeagiEndpointState, TransportProtocolEndpoint};
298 use std::sync::{Arc, Mutex};
299
300 #[derive(Clone)]
301 struct DummyRequesterProperties {
302 endpoint: TransportProtocolEndpoint,
303 last_request: Arc<Mutex<Vec<u8>>>,
304 }
305
306 struct DummyRequester {
307 endpoint: TransportProtocolEndpoint,
308 state: FeagiEndpointState,
309 last_request: Arc<Mutex<Vec<u8>>>,
310 }
311
312 impl feagi_io::traits_and_enums::client::FeagiClient for DummyRequester {
313 fn poll(&mut self) -> &FeagiEndpointState {
314 &self.state
315 }
316
317 fn request_connect(&mut self) -> Result<(), feagi_io::FeagiNetworkError> {
318 self.state = FeagiEndpointState::ActiveWaiting;
319 Ok(())
320 }
321
322 fn request_disconnect(&mut self) -> Result<(), feagi_io::FeagiNetworkError> {
323 self.state = FeagiEndpointState::Inactive;
324 Ok(())
325 }
326
327 fn confirm_error_and_close(&mut self) -> Result<(), feagi_io::FeagiNetworkError> {
328 self.state = FeagiEndpointState::Inactive;
329 Ok(())
330 }
331
332 fn get_endpoint_target(&self) -> TransportProtocolEndpoint {
333 self.endpoint.clone()
334 }
335 }
336
337 impl feagi_io::traits_and_enums::client::FeagiClientRequester for DummyRequester {
338 fn publish_request(&mut self, request: &[u8]) -> Result<(), feagi_io::FeagiNetworkError> {
339 *self.last_request.lock().expect("lock") = request.to_vec();
340 Ok(())
341 }
342
343 fn consume_retrieved_response(&mut self) -> Result<&[u8], feagi_io::FeagiNetworkError> {
344 Err(feagi_io::FeagiNetworkError::ReceiveFailed(
345 "dummy requester has no responses".to_string(),
346 ))
347 }
348
349 fn as_boxed_requester_properties(
350 &self,
351 ) -> Box<dyn feagi_io::traits_and_enums::client::FeagiClientRequesterProperties> {
352 Box::new(DummyRequesterProperties {
353 endpoint: self.endpoint.clone(),
354 last_request: self.last_request.clone(),
355 })
356 }
357 }
358
359 impl feagi_io::traits_and_enums::client::FeagiClientRequesterProperties
360 for DummyRequesterProperties
361 {
362 fn as_boxed_client_requester(
363 &self,
364 ) -> Box<dyn feagi_io::traits_and_enums::client::FeagiClientRequester> {
365 Box::new(DummyRequester {
366 endpoint: self.endpoint.clone(),
367 state: FeagiEndpointState::Inactive,
368 last_request: self.last_request.clone(),
369 })
370 }
371
372 fn get_endpoint_target(&self) -> TransportProtocolEndpoint {
373 self.endpoint.clone()
374 }
375 }
376
377 #[test]
378 fn registration_request_can_be_sent_before_registration() {
379 let endpoint = TransportProtocolEndpoint::Zmq(
380 ZmqUrl::new("tcp://example:1").expect("valid dummy endpoint"),
381 );
382 let last_request: Arc<Mutex<Vec<u8>>> = Arc::new(Mutex::new(Vec::new()));
383 let props = Box::new(DummyRequesterProperties {
384 endpoint,
385 last_request: last_request.clone(),
386 });
387
388 let mut agent = CommandControlAgent::new(props);
389 agent
390 .request_connect()
391 .expect("connect request should succeed");
392
393 agent
394 .request_registration(
395 AgentDescriptor::new("m", "n", 1).expect("descriptor"),
396 AuthToken::new([0u8; 32]),
397 vec![AgentCapabilities::SendSensorData],
398 )
399 .expect("registration request should be sendable with blank id");
400
401 assert!(
402 !last_request.lock().expect("lock").is_empty(),
403 "expected a serialized registration request to be published"
404 );
405 }
406}