Skip to main content

feagi_agent/clients/blocking/
command_control_agent.rs

1use crate::command_and_control::agent_registration_message::{
2    AgentRegistrationMessage, DeregistrationRequest, DeregistrationResponse, RegistrationRequest,
3    RegistrationResponse,
4};
5use crate::command_and_control::FeagiMessage;
6use crate::{AgentCapabilities, AgentDescriptor, AuthToken, FeagiAgentError};
7use feagi_io::traits_and_enums::client::{FeagiClientRequester, FeagiClientRequesterProperties};
8use feagi_io::traits_and_enums::shared::{FeagiEndpointState, TransportProtocolEndpoint};
9use feagi_io::AgentID;
10use feagi_serialization::FeagiByteContainer;
11use std::collections::HashMap;
12
13pub struct CommandControlAgent {
14    properties: Box<dyn FeagiClientRequesterProperties>,
15    requester: Option<Box<dyn FeagiClientRequester>>,
16    request_buffer: FeagiByteContainer,
17    send_buffer: FeagiByteContainer,
18    registration_status: AgentRegistrationStatus,
19}
20
21impl CommandControlAgent {
22    pub fn new(endpoint_properties: Box<dyn FeagiClientRequesterProperties>) -> Self {
23        Self {
24            registration_status: AgentRegistrationStatus::NotRegistered,
25            properties: endpoint_properties,
26            requester: None,
27            request_buffer: FeagiByteContainer::new_empty(),
28            send_buffer: FeagiByteContainer::new_empty(),
29        }
30    }
31
32    //region Properties
33    pub fn registration_status(&self) -> &AgentRegistrationStatus {
34        &self.registration_status
35    }
36
37    pub fn registered_endpoint_target(&mut self) -> TransportProtocolEndpoint {
38        self.properties.get_endpoint_target()
39    }
40    //endregion
41
42    //region Helpers
43
44    pub fn request_connect(&mut self) -> Result<(), FeagiAgentError> {
45        if self.registration_status != AgentRegistrationStatus::NotRegistered {
46            return Err(FeagiAgentError::ConnectionFailed(
47                "Agent already connected and registered!".to_string(),
48            ));
49        }
50
51        if self.requester.is_none() {
52            self.requester = Some(self.properties.as_boxed_client_requester());
53        }
54
55        let mut requester = self.requester.take().unwrap();
56
57        match requester.poll() {
58            FeagiEndpointState::Inactive => {
59                requester.request_connect()?;
60                self.requester = Some(requester);
61                Ok(())
62            }
63            _ => {
64                self.requester = Some(requester);
65                Err(FeagiAgentError::ConnectionFailed(
66                    "Socket is already active!".to_string(),
67                ))
68            }
69        }
70    }
71
72    pub fn request_registration(
73        &mut self,
74        agent_descriptor: AgentDescriptor,
75        auth_token: AuthToken,
76        requested_capabilities: Vec<AgentCapabilities>,
77    ) -> Result<(), FeagiAgentError> {
78        let transport_protocol = if let Some(requester) = &mut self.requester {
79            requester
80                .get_endpoint_target()
81                .as_transport_protocol_implementation()
82        } else {
83            return Err(FeagiAgentError::ConnectionFailed(
84                "Cannot register to endpoint when not connected!".to_string(),
85            ));
86        };
87
88        let request = RegistrationRequest::new(
89            agent_descriptor,
90            auth_token,
91            requested_capabilities,
92            transport_protocol,
93        );
94
95        let request_message = FeagiMessage::AgentRegistration(
96            AgentRegistrationMessage::ClientRequestRegistration(request),
97        );
98
99        self.send_message(request_message, 0)?;
100        Ok(())
101    }
102
103    /// Request voluntary deregistration for the given session.
104    ///
105    /// The optional `reason` string is forwarded for observability and does not
106    /// alter deregistration behavior on the server.
107    pub fn request_deregistration(
108        &mut self,
109        reason: Option<String>, // TODO Please dont use strings, use ENUMS!
110    ) -> Result<(), FeagiAgentError> {
111        let request = DeregistrationRequest::new(reason);
112        let message = FeagiMessage::AgentRegistration(
113            AgentRegistrationMessage::ClientRequestDeregistration(request),
114        );
115
116        self.send_message(message, 0)?;
117        Ok(())
118    }
119
120    /// Send a heartbeat over the command/control channel for the provided session.
121    ///
122    /// This is the deterministic, tick-driven heartbeat primitive used by
123    /// higher-level client loops.
124    pub fn send_heartbeat(&mut self) -> Result<(), FeagiAgentError> {
125        let heartbeat_message = FeagiMessage::HeartBeat;
126        self.send_message(heartbeat_message, 0)
127    }
128
129    /// Request transport disconnect for command/control requester.
130    ///
131    /// This is a best-effort disconnect primitive used by higher-level shutdown
132    /// orchestration to ensure sockets are torn down deterministically.
133    pub fn request_disconnect(&mut self) -> Result<(), FeagiAgentError> {
134        let requester = self.requester.as_mut().ok_or_else(|| {
135            FeagiAgentError::ConnectionFailed("No socket is active to disconnect!".to_string())
136        })?;
137        requester.request_disconnect()?;
138        Ok(())
139    }
140
141    //endregion
142
143    //region Base Functions
144
145    pub fn poll_for_messages(
146        &mut self,
147    ) -> Result<(&FeagiEndpointState, Option<FeagiMessage>), FeagiAgentError> {
148        let maybe_message = {
149            let requester = self.requester.as_mut().ok_or_else(|| {
150                FeagiAgentError::ConnectionFailed("No socket is active to poll!".to_string())
151            })?;
152
153            let state_snapshot = requester.poll().clone();
154            match state_snapshot {
155                FeagiEndpointState::Inactive
156                | FeagiEndpointState::Pending
157                | FeagiEndpointState::ActiveWaiting => Ok(None),
158                FeagiEndpointState::ActiveHasData => {
159                    let data = requester.consume_retrieved_response()?;
160                    self.request_buffer
161                        .try_write_data_by_copy_and_verify(data)?;
162                    let feagi_message: FeagiMessage = (&self.request_buffer).try_into()?;
163
164                    match &feagi_message {
165                        FeagiMessage::HeartBeat => Ok(Some(FeagiMessage::HeartBeat)),
166                        FeagiMessage::AgentRegistration(registration_message) => {
167                            match registration_message {
168                                AgentRegistrationMessage::ClientRequestRegistration(_) => {
169                                    Err(FeagiAgentError::ConnectionFailed(
170                                        "Client cannot register agents!".to_string(),
171                                    ))
172                                }
173                                AgentRegistrationMessage::ServerRespondsRegistration(
174                                    registration_response,
175                                ) => match registration_response {
176                                    RegistrationResponse::FailedInvalidRequest => {
177                                        Err(FeagiAgentError::ConnectionFailed(
178                                            "Invalid server responses!".to_string(),
179                                        ))
180                                    }
181                                    RegistrationResponse::FailedInvalidAuth => {
182                                        Err(FeagiAgentError::ConnectionFailed(
183                                            "Invalid auth token!".to_string(),
184                                        ))
185                                    }
186                                    RegistrationResponse::AlreadyRegistered => {
187                                        Err(FeagiAgentError::ConnectionFailed(
188                                            "Client already registered!".to_string(),
189                                        ))
190                                    }
191                                    RegistrationResponse::Success(session_id, endpoints) => {
192                                        self.registration_status =
193                                            AgentRegistrationStatus::Registered(
194                                                *session_id,
195                                                endpoints.clone(),
196                                            );
197                                        Ok(Some(feagi_message))
198                                    }
199                                },
200                                AgentRegistrationMessage::ClientRequestDeregistration(_) => {
201                                    Err(FeagiAgentError::ConnectionFailed(
202                                        "Client cannot receive deregistration request from server!"
203                                            .to_string(),
204                                    ))
205                                }
206                                AgentRegistrationMessage::ServerRespondsDeregistration(
207                                    deregistration_response,
208                                ) => match deregistration_response {
209                                    DeregistrationResponse::Success => {
210                                        requester.request_disconnect()?;
211                                        self.registration_status =
212                                            AgentRegistrationStatus::NotRegistered;
213                                        Ok(Some(feagi_message))
214                                    }
215                                    DeregistrationResponse::NotRegistered => {
216                                        Ok(Some(feagi_message))
217                                    }
218                                },
219                            }
220                        }
221                        _ => Ok(Some(feagi_message)),
222                    }
223                }
224                FeagiEndpointState::Errored(_) => {
225                    requester.confirm_error_and_close()?;
226                    Err(FeagiAgentError::ConnectionFailed(
227                        "Error occurred".to_string(),
228                    ))
229                }
230            }
231        }?;
232
233        let state = self
234            .requester
235            .as_mut()
236            .ok_or_else(|| {
237                FeagiAgentError::ConnectionFailed("No socket is active to poll!".to_string())
238            })?
239            .poll();
240
241        Ok((state, maybe_message))
242    }
243
244    pub fn send_message(
245        &mut self,
246        message: FeagiMessage,
247        increment_value: u16,
248    ) -> Result<(), FeagiAgentError> {
249        let agent_id = match &self.registration_status {
250            AgentRegistrationStatus::Registered(agent_id, _) => *agent_id,
251            AgentRegistrationStatus::NotRegistered => {
252                // Registration must be possible before a session id exists.
253                // FEAGI servers accept a blank agent id for registration requests.
254                match &message {
255                    FeagiMessage::AgentRegistration(
256                        AgentRegistrationMessage::ClientRequestRegistration(_),
257                    ) => AgentID::new_blank(),
258                    _ => {
259                        return Err(FeagiAgentError::UnableToSendData(
260                            "Nonregistered agent cannot send message!".to_string(),
261                        ));
262                    }
263                }
264            }
265        };
266
267        if let Some(requester) = &mut self.requester {
268            message.serialize_to_byte_container(
269                &mut self.send_buffer,
270                agent_id,
271                increment_value,
272            )?;
273            requester.publish_request(self.send_buffer.get_byte_ref())?;
274            Ok(())
275        } else {
276            // This state should be impossible. something went very wrong
277            panic!("Active state but no socket!!")
278        }
279    }
280
281    //endregion
282}
283
284#[derive(Debug, PartialEq, Clone)]
285pub enum AgentRegistrationStatus {
286    NotRegistered,
287    Registered(
288        AgentID,
289        HashMap<AgentCapabilities, TransportProtocolEndpoint>,
290    ),
291}
292
293#[cfg(test)]
294mod tests {
295    use super::*;
296    use feagi_io::protocol_implementations::zmq::ZmqUrl;
297    use feagi_io::traits_and_enums::shared::{FeagiEndpointState, TransportProtocolEndpoint};
298    use std::sync::{Arc, Mutex};
299
300    #[derive(Clone)]
301    struct DummyRequesterProperties {
302        endpoint: TransportProtocolEndpoint,
303        last_request: Arc<Mutex<Vec<u8>>>,
304    }
305
306    struct DummyRequester {
307        endpoint: TransportProtocolEndpoint,
308        state: FeagiEndpointState,
309        last_request: Arc<Mutex<Vec<u8>>>,
310    }
311
312    impl feagi_io::traits_and_enums::client::FeagiClient for DummyRequester {
313        fn poll(&mut self) -> &FeagiEndpointState {
314            &self.state
315        }
316
317        fn request_connect(&mut self) -> Result<(), feagi_io::FeagiNetworkError> {
318            self.state = FeagiEndpointState::ActiveWaiting;
319            Ok(())
320        }
321
322        fn request_disconnect(&mut self) -> Result<(), feagi_io::FeagiNetworkError> {
323            self.state = FeagiEndpointState::Inactive;
324            Ok(())
325        }
326
327        fn confirm_error_and_close(&mut self) -> Result<(), feagi_io::FeagiNetworkError> {
328            self.state = FeagiEndpointState::Inactive;
329            Ok(())
330        }
331
332        fn get_endpoint_target(&self) -> TransportProtocolEndpoint {
333            self.endpoint.clone()
334        }
335    }
336
337    impl feagi_io::traits_and_enums::client::FeagiClientRequester for DummyRequester {
338        fn publish_request(&mut self, request: &[u8]) -> Result<(), feagi_io::FeagiNetworkError> {
339            *self.last_request.lock().expect("lock") = request.to_vec();
340            Ok(())
341        }
342
343        fn consume_retrieved_response(&mut self) -> Result<&[u8], feagi_io::FeagiNetworkError> {
344            Err(feagi_io::FeagiNetworkError::ReceiveFailed(
345                "dummy requester has no responses".to_string(),
346            ))
347        }
348
349        fn as_boxed_requester_properties(
350            &self,
351        ) -> Box<dyn feagi_io::traits_and_enums::client::FeagiClientRequesterProperties> {
352            Box::new(DummyRequesterProperties {
353                endpoint: self.endpoint.clone(),
354                last_request: self.last_request.clone(),
355            })
356        }
357    }
358
359    impl feagi_io::traits_and_enums::client::FeagiClientRequesterProperties
360        for DummyRequesterProperties
361    {
362        fn as_boxed_client_requester(
363            &self,
364        ) -> Box<dyn feagi_io::traits_and_enums::client::FeagiClientRequester> {
365            Box::new(DummyRequester {
366                endpoint: self.endpoint.clone(),
367                state: FeagiEndpointState::Inactive,
368                last_request: self.last_request.clone(),
369            })
370        }
371
372        fn get_endpoint_target(&self) -> TransportProtocolEndpoint {
373            self.endpoint.clone()
374        }
375    }
376
377    #[test]
378    fn registration_request_can_be_sent_before_registration() {
379        let endpoint = TransportProtocolEndpoint::Zmq(
380            ZmqUrl::new("tcp://example:1").expect("valid dummy endpoint"),
381        );
382        let last_request: Arc<Mutex<Vec<u8>>> = Arc::new(Mutex::new(Vec::new()));
383        let props = Box::new(DummyRequesterProperties {
384            endpoint,
385            last_request: last_request.clone(),
386        });
387
388        let mut agent = CommandControlAgent::new(props);
389        agent
390            .request_connect()
391            .expect("connect request should succeed");
392
393        agent
394            .request_registration(
395                AgentDescriptor::new("m", "n", 1).expect("descriptor"),
396                AuthToken::new([0u8; 32]),
397                vec![AgentCapabilities::SendSensorData],
398            )
399            .expect("registration request should be sendable with blank id");
400
401        assert!(
402            !last_request.lock().expect("lock").is_empty(),
403            "expected a serialized registration request to be published"
404        );
405    }
406}