1use crate::command_and_control::agent_registration_message::{
15 AgentRegistrationMessage, DeregistrationResponse, RegistrationResponse,
16};
17use crate::command_and_control::FeagiMessage;
18use crate::{AgentCapabilities, AgentDescriptor, AuthToken};
19use feagi_io::traits_and_enums::shared::{FeagiEndpointState, TransportProtocolEndpoint};
20use feagi_io::AgentID;
21use std::collections::HashMap;
22
23pub type NowMs = u64;
25
26#[derive(Debug, Clone)]
27pub struct SessionTimingConfig {
28 pub heartbeat_interval_ms: u64,
30 pub registration_deadline_ms: Option<u64>,
32}
33
34#[derive(Debug, Clone)]
35pub struct SessionInit {
36 pub agent_descriptor: AgentDescriptor,
37 pub auth_token: AuthToken,
38 pub requested_capabilities: Vec<AgentCapabilities>,
39 pub timing: SessionTimingConfig,
40}
41
42#[derive(Debug, Clone, PartialEq, Eq)]
43pub enum SessionPhase {
44 Idle,
45 ControlConnecting,
46 Registering,
47 DataConnecting,
48 Active,
49 Deregistering,
50 Failed,
51}
52
53#[derive(Debug, Clone)]
54pub struct SessionStateMachine {
55 init: SessionInit,
56 phase: SessionPhase,
57 connect_started_at_ms: Option<NowMs>,
58 last_heartbeat_sent_at_ms: Option<NowMs>,
59 session_id: Option<AgentID>,
60 endpoints: Option<HashMap<AgentCapabilities, TransportProtocolEndpoint>>,
61 last_error: Option<String>,
62}
63
64impl SessionStateMachine {
65 pub fn new(init: SessionInit) -> Self {
66 Self {
67 init,
68 phase: SessionPhase::Idle,
69 connect_started_at_ms: None,
70 last_heartbeat_sent_at_ms: None,
71 session_id: None,
72 endpoints: None,
73 last_error: None,
74 }
75 }
76
77 pub fn phase(&self) -> &SessionPhase {
78 &self.phase
79 }
80
81 pub fn session_id(&self) -> Option<AgentID> {
82 self.session_id
83 }
84
85 pub fn endpoints(&self) -> Option<&HashMap<AgentCapabilities, TransportProtocolEndpoint>> {
86 self.endpoints.as_ref()
87 }
88
89 pub fn last_error(&self) -> Option<&str> {
90 self.last_error.as_deref()
91 }
92
93 pub fn start_connect(&mut self, now_ms: NowMs) -> Vec<SessionAction> {
95 self.connect_started_at_ms = Some(now_ms);
96 self.last_error = None;
97 self.session_id = None;
98 self.endpoints = None;
99 self.last_heartbeat_sent_at_ms = None;
100 self.phase = SessionPhase::ControlConnecting;
101 vec![SessionAction::ControlRequestConnect]
102 }
103
104 pub fn start_deregister(&mut self, reason: Option<String>) -> Vec<SessionAction> {
106 match self.phase {
107 SessionPhase::Active | SessionPhase::DataConnecting | SessionPhase::Registering => {
108 self.phase = SessionPhase::Deregistering;
109 vec![SessionAction::ControlSendDeregistration { reason }]
110 }
111 _ => Vec::new(),
112 }
113 }
114
115 pub fn step(&mut self, now_ms: NowMs, events: &[SessionEvent]) -> Vec<SessionAction> {
117 let mut actions: Vec<SessionAction> = Vec::new();
118
119 if matches!(
121 self.phase,
122 SessionPhase::ControlConnecting | SessionPhase::Registering
123 ) {
124 if let (Some(start_ms), Some(deadline_ms)) = (
125 self.connect_started_at_ms,
126 self.init.timing.registration_deadline_ms,
127 ) {
128 if now_ms.saturating_sub(start_ms) > deadline_ms {
129 self.fail("registration deadline exceeded");
130 return actions;
131 }
132 }
133 }
134
135 for event in events {
136 match event {
137 SessionEvent::ControlObserved { state, message } => {
138 actions.extend(self.on_control_observed(now_ms, state, message.clone()));
139 }
140 SessionEvent::SensorObserved { state } => {
141 actions.extend(self.on_sensor_observed(state.clone()));
142 }
143 SessionEvent::MotorObserved { state } => {
144 actions.extend(self.on_motor_observed(state.clone()));
145 }
146 SessionEvent::Deregistered { response } => {
147 actions.extend(self.on_deregistered(response.clone()));
148 }
149 }
150 }
151
152 if self.phase == SessionPhase::Active && self.heartbeat_due(now_ms) {
154 actions.push(SessionAction::ControlSendHeartbeat);
155 self.last_heartbeat_sent_at_ms = Some(now_ms);
156 }
157
158 actions
159 }
160
161 fn heartbeat_due(&self, now_ms: NowMs) -> bool {
162 let interval = self.init.timing.heartbeat_interval_ms;
163 if interval == 0 {
164 return false;
166 }
167 match self.last_heartbeat_sent_at_ms {
168 None => true,
169 Some(last) => now_ms.saturating_sub(last) >= interval,
170 }
171 }
172
173 fn on_control_observed(
174 &mut self,
175 now_ms: NowMs,
176 state: &FeagiEndpointState,
177 message: Option<FeagiMessage>,
178 ) -> Vec<SessionAction> {
179 match self.phase {
180 SessionPhase::ControlConnecting => match state {
181 FeagiEndpointState::ActiveWaiting | FeagiEndpointState::ActiveHasData => {
182 self.phase = SessionPhase::Registering;
183 vec![SessionAction::ControlSendRegistration {
184 agent_descriptor: self.init.agent_descriptor.clone(),
185 auth_token: self.init.auth_token.clone(),
186 requested_capabilities: self.init.requested_capabilities.clone(),
187 }]
188 }
189 FeagiEndpointState::Errored(e) => {
190 self.fail(&format!("control errored: {e}"));
191 Vec::new()
192 }
193 _ => Vec::new(),
194 },
195 SessionPhase::Registering => {
196 if let Some(FeagiMessage::AgentRegistration(reg_msg)) = message {
197 match reg_msg {
198 AgentRegistrationMessage::ServerRespondsRegistration(resp) => {
199 return self.on_registration_response(now_ms, resp);
200 }
201 AgentRegistrationMessage::ServerRespondsDeregistration(resp) => {
202 return self.on_deregistered(resp);
203 }
204 _ => {}
205 }
206 }
207 if let FeagiEndpointState::Errored(e) = state {
208 self.fail(&format!("control errored: {e}"));
209 }
210 Vec::new()
211 }
212 SessionPhase::Active => {
213 if let Some(FeagiMessage::HeartBeat) = message {
214 }
216 if let Some(FeagiMessage::AgentRegistration(
217 AgentRegistrationMessage::ServerRespondsDeregistration(resp),
218 )) = message
219 {
220 return self.on_deregistered(resp);
221 }
222 if let FeagiEndpointState::Errored(e) = state {
223 self.fail(&format!("control errored: {e}"));
224 }
225 Vec::new()
226 }
227 SessionPhase::Deregistering => {
228 if let Some(FeagiMessage::AgentRegistration(
229 AgentRegistrationMessage::ServerRespondsDeregistration(resp),
230 )) = message
231 {
232 return self.on_deregistered(resp);
233 }
234 if let FeagiEndpointState::Errored(e) = state {
235 self.fail(&format!("control errored: {e}"));
236 }
237 Vec::new()
238 }
239 _ => Vec::new(),
240 }
241 }
242
243 fn on_registration_response(
244 &mut self,
245 now_ms: NowMs,
246 resp: RegistrationResponse,
247 ) -> Vec<SessionAction> {
248 match resp {
249 RegistrationResponse::Success(session_id, endpoints) => {
250 self.session_id = Some(session_id);
251 self.endpoints = Some(endpoints.clone());
252 self.phase = SessionPhase::DataConnecting;
253 self.last_heartbeat_sent_at_ms = Some(now_ms);
254
255 let sensory = endpoints.get(&AgentCapabilities::SendSensorData);
257 let motor = endpoints.get(&AgentCapabilities::ReceiveMotorData);
258 if sensory.is_none() || motor.is_none() {
259 self.fail("registration success missing required endpoints");
260 return Vec::new();
261 }
262 vec![
263 SessionAction::SensorConnectTo {
264 endpoint: sensory.unwrap().clone(),
265 },
266 SessionAction::MotorConnectTo {
267 endpoint: motor.unwrap().clone(),
268 },
269 ]
270 }
271 RegistrationResponse::FailedInvalidAuth => {
272 self.fail("registration failed: invalid auth");
273 Vec::new()
274 }
275 RegistrationResponse::FailedInvalidRequest => {
276 self.fail("registration failed: invalid request");
277 Vec::new()
278 }
279 RegistrationResponse::AlreadyRegistered => {
280 self.fail("registration failed: already registered");
281 Vec::new()
282 }
283 }
284 }
285
286 fn on_sensor_observed(&mut self, state: FeagiEndpointState) -> Vec<SessionAction> {
287 if self.phase != SessionPhase::DataConnecting {
288 return Vec::new();
289 }
290 if let FeagiEndpointState::Errored(e) = state {
291 self.fail(&format!("sensor errored: {e}"));
292 return Vec::new();
293 }
294 Vec::new()
296 }
297
298 fn on_motor_observed(&mut self, state: FeagiEndpointState) -> Vec<SessionAction> {
299 if self.phase != SessionPhase::DataConnecting {
300 return Vec::new();
301 }
302 if let FeagiEndpointState::Errored(e) = state {
303 self.fail(&format!("motor errored: {e}"));
304 return Vec::new();
305 }
306 Vec::new()
307 }
308
309 fn on_deregistered(&mut self, response: DeregistrationResponse) -> Vec<SessionAction> {
310 if self.phase != SessionPhase::Deregistering && self.phase != SessionPhase::Registering {
311 }
313 match response {
314 DeregistrationResponse::Success | DeregistrationResponse::NotRegistered => {
315 self.phase = SessionPhase::Idle;
316 self.session_id = None;
317 self.endpoints = None;
318 self.last_heartbeat_sent_at_ms = None;
319 self.connect_started_at_ms = None;
320 }
321 }
322 Vec::new()
323 }
324
325 fn fail(&mut self, msg: &str) {
326 self.phase = SessionPhase::Failed;
327 self.last_error = Some(msg.to_string());
328 }
329
330 pub fn try_mark_data_channels_active(
333 &mut self,
334 sensor_state: &FeagiEndpointState,
335 motor_state: &FeagiEndpointState,
336 ) {
337 if self.phase != SessionPhase::DataConnecting {
338 return;
339 }
340 let sensor_ok = matches!(
341 sensor_state,
342 FeagiEndpointState::ActiveWaiting | FeagiEndpointState::ActiveHasData
343 );
344 let motor_ok = matches!(
345 motor_state,
346 FeagiEndpointState::ActiveWaiting | FeagiEndpointState::ActiveHasData
347 );
348 if sensor_ok && motor_ok {
349 self.phase = SessionPhase::Active;
350 }
351 }
352}
353
354#[allow(clippy::large_enum_variant)]
355#[derive(Debug, Clone)]
356pub enum SessionEvent {
357 ControlObserved {
358 state: FeagiEndpointState,
359 message: Option<FeagiMessage>,
360 },
361 SensorObserved {
362 state: FeagiEndpointState,
363 },
364 MotorObserved {
365 state: FeagiEndpointState,
366 },
367 Deregistered {
368 response: DeregistrationResponse,
369 },
370}
371
372#[derive(Debug, Clone)]
373pub enum SessionAction {
374 ControlRequestConnect,
375 ControlSendRegistration {
376 agent_descriptor: AgentDescriptor,
377 auth_token: AuthToken,
378 requested_capabilities: Vec<AgentCapabilities>,
379 },
380 ControlSendHeartbeat,
381 ControlSendDeregistration {
382 reason: Option<String>,
383 },
384 SensorConnectTo {
385 endpoint: TransportProtocolEndpoint,
386 },
387 MotorConnectTo {
388 endpoint: TransportProtocolEndpoint,
389 },
390}