Skip to main content

feagi_agent/clients/
session_state_machine.rs

1//! Runtime-agnostic session orchestration state machine.
2//!
3//! This module provides a pure, deterministic orchestration layer above the
4//! poll-based `feagi-io` endpoint state machines.
5//!
6//! Design constraints:
7//! - No sleeps, no threads, no blocking waits
8//! - No hardcoded timeouts/retries/backoff
9//! - Runtime-agnostic (Tokio/WASM/Embassy/RTOS drivers can all execute Actions)
10//! - Transport-agnostic (ZMQ and WebSocket are supported via `TransportProtocolEndpoint`)
11//!
12//! @cursor:critical-path
13
14use crate::command_and_control::agent_registration_message::{
15    AgentRegistrationMessage, DeregistrationResponse, RegistrationResponse,
16};
17use crate::command_and_control::FeagiMessage;
18use crate::{AgentCapabilities, AgentDescriptor, AuthToken};
19use feagi_io::traits_and_enums::shared::{FeagiEndpointState, TransportProtocolEndpoint};
20use feagi_io::AgentID;
21use std::collections::HashMap;
22
23/// Milliseconds in a monotonic clock domain provided by the driver.
24pub type NowMs = u64;
25
26#[derive(Debug, Clone)]
27pub struct SessionTimingConfig {
28    /// Heartbeat cadence for command/control channel, in milliseconds.
29    pub heartbeat_interval_ms: u64,
30    /// Optional registration deadline relative to `start_connect()` (in ms).
31    pub registration_deadline_ms: Option<u64>,
32}
33
34#[derive(Debug, Clone)]
35pub struct SessionInit {
36    pub agent_descriptor: AgentDescriptor,
37    pub auth_token: AuthToken,
38    pub requested_capabilities: Vec<AgentCapabilities>,
39    pub timing: SessionTimingConfig,
40}
41
42#[derive(Debug, Clone, PartialEq, Eq)]
43pub enum SessionPhase {
44    Idle,
45    ControlConnecting,
46    Registering,
47    DataConnecting,
48    Active,
49    Deregistering,
50    Failed,
51}
52
53#[derive(Debug, Clone)]
54pub struct SessionStateMachine {
55    init: SessionInit,
56    phase: SessionPhase,
57    connect_started_at_ms: Option<NowMs>,
58    last_heartbeat_sent_at_ms: Option<NowMs>,
59    session_id: Option<AgentID>,
60    endpoints: Option<HashMap<AgentCapabilities, TransportProtocolEndpoint>>,
61    last_error: Option<String>,
62}
63
64impl SessionStateMachine {
65    pub fn new(init: SessionInit) -> Self {
66        Self {
67            init,
68            phase: SessionPhase::Idle,
69            connect_started_at_ms: None,
70            last_heartbeat_sent_at_ms: None,
71            session_id: None,
72            endpoints: None,
73            last_error: None,
74        }
75    }
76
77    pub fn phase(&self) -> &SessionPhase {
78        &self.phase
79    }
80
81    pub fn session_id(&self) -> Option<AgentID> {
82        self.session_id
83    }
84
85    pub fn endpoints(&self) -> Option<&HashMap<AgentCapabilities, TransportProtocolEndpoint>> {
86        self.endpoints.as_ref()
87    }
88
89    pub fn last_error(&self) -> Option<&str> {
90        self.last_error.as_deref()
91    }
92
93    /// Begin connection orchestration. Returns the initial actions.
94    pub fn start_connect(&mut self, now_ms: NowMs) -> Vec<SessionAction> {
95        self.connect_started_at_ms = Some(now_ms);
96        self.last_error = None;
97        self.session_id = None;
98        self.endpoints = None;
99        self.last_heartbeat_sent_at_ms = None;
100        self.phase = SessionPhase::ControlConnecting;
101        vec![SessionAction::ControlRequestConnect]
102    }
103
104    /// Request a graceful deregistration. Returns actions to perform now.
105    pub fn start_deregister(&mut self, reason: Option<String>) -> Vec<SessionAction> {
106        match self.phase {
107            SessionPhase::Active | SessionPhase::DataConnecting | SessionPhase::Registering => {
108                self.phase = SessionPhase::Deregistering;
109                vec![SessionAction::ControlSendDeregistration { reason }]
110            }
111            _ => Vec::new(),
112        }
113    }
114
115    /// Advance the state machine by providing observed events.
116    pub fn step(&mut self, now_ms: NowMs, events: &[SessionEvent]) -> Vec<SessionAction> {
117        let mut actions: Vec<SessionAction> = Vec::new();
118
119        // Optional registration deadline enforcement (policy from config, not hardcoded).
120        if matches!(
121            self.phase,
122            SessionPhase::ControlConnecting | SessionPhase::Registering
123        ) {
124            if let (Some(start_ms), Some(deadline_ms)) = (
125                self.connect_started_at_ms,
126                self.init.timing.registration_deadline_ms,
127            ) {
128                if now_ms.saturating_sub(start_ms) > deadline_ms {
129                    self.fail("registration deadline exceeded");
130                    return actions;
131                }
132            }
133        }
134
135        for event in events {
136            match event {
137                SessionEvent::ControlObserved { state, message } => {
138                    actions.extend(self.on_control_observed(now_ms, state, message.clone()));
139                }
140                SessionEvent::SensorObserved { state } => {
141                    actions.extend(self.on_sensor_observed(state.clone()));
142                }
143                SessionEvent::MotorObserved { state } => {
144                    actions.extend(self.on_motor_observed(state.clone()));
145                }
146                SessionEvent::Deregistered { response } => {
147                    actions.extend(self.on_deregistered(response.clone()));
148                }
149            }
150        }
151
152        // Heartbeat scheduling: driver provides `now_ms`, we emit action only when due.
153        if self.phase == SessionPhase::Active && self.heartbeat_due(now_ms) {
154            actions.push(SessionAction::ControlSendHeartbeat);
155            self.last_heartbeat_sent_at_ms = Some(now_ms);
156        }
157
158        actions
159    }
160
161    fn heartbeat_due(&self, now_ms: NowMs) -> bool {
162        let interval = self.init.timing.heartbeat_interval_ms;
163        if interval == 0 {
164            // Policy: treat as disabled; caller should never set to 0.
165            return false;
166        }
167        match self.last_heartbeat_sent_at_ms {
168            None => true,
169            Some(last) => now_ms.saturating_sub(last) >= interval,
170        }
171    }
172
173    fn on_control_observed(
174        &mut self,
175        now_ms: NowMs,
176        state: &FeagiEndpointState,
177        message: Option<FeagiMessage>,
178    ) -> Vec<SessionAction> {
179        match self.phase {
180            SessionPhase::ControlConnecting => match state {
181                FeagiEndpointState::ActiveWaiting | FeagiEndpointState::ActiveHasData => {
182                    self.phase = SessionPhase::Registering;
183                    vec![SessionAction::ControlSendRegistration {
184                        agent_descriptor: self.init.agent_descriptor.clone(),
185                        auth_token: self.init.auth_token.clone(),
186                        requested_capabilities: self.init.requested_capabilities.clone(),
187                    }]
188                }
189                FeagiEndpointState::Errored(e) => {
190                    self.fail(&format!("control errored: {e}"));
191                    Vec::new()
192                }
193                _ => Vec::new(),
194            },
195            SessionPhase::Registering => {
196                if let Some(FeagiMessage::AgentRegistration(reg_msg)) = message {
197                    match reg_msg {
198                        AgentRegistrationMessage::ServerRespondsRegistration(resp) => {
199                            return self.on_registration_response(now_ms, resp);
200                        }
201                        AgentRegistrationMessage::ServerRespondsDeregistration(resp) => {
202                            return self.on_deregistered(resp);
203                        }
204                        _ => {}
205                    }
206                }
207                if let FeagiEndpointState::Errored(e) = state {
208                    self.fail(&format!("control errored: {e}"));
209                }
210                Vec::new()
211            }
212            SessionPhase::Active => {
213                if let Some(FeagiMessage::HeartBeat) = message {
214                    // Heartbeat ack; no state change required.
215                }
216                if let Some(FeagiMessage::AgentRegistration(
217                    AgentRegistrationMessage::ServerRespondsDeregistration(resp),
218                )) = message
219                {
220                    return self.on_deregistered(resp);
221                }
222                if let FeagiEndpointState::Errored(e) = state {
223                    self.fail(&format!("control errored: {e}"));
224                }
225                Vec::new()
226            }
227            SessionPhase::Deregistering => {
228                if let Some(FeagiMessage::AgentRegistration(
229                    AgentRegistrationMessage::ServerRespondsDeregistration(resp),
230                )) = message
231                {
232                    return self.on_deregistered(resp);
233                }
234                if let FeagiEndpointState::Errored(e) = state {
235                    self.fail(&format!("control errored: {e}"));
236                }
237                Vec::new()
238            }
239            _ => Vec::new(),
240        }
241    }
242
243    fn on_registration_response(
244        &mut self,
245        now_ms: NowMs,
246        resp: RegistrationResponse,
247    ) -> Vec<SessionAction> {
248        match resp {
249            RegistrationResponse::Success(session_id, endpoints) => {
250                self.session_id = Some(session_id);
251                self.endpoints = Some(endpoints.clone());
252                self.phase = SessionPhase::DataConnecting;
253                self.last_heartbeat_sent_at_ms = Some(now_ms);
254
255                // Require sensory + motor endpoints.
256                let sensory = endpoints.get(&AgentCapabilities::SendSensorData);
257                let motor = endpoints.get(&AgentCapabilities::ReceiveMotorData);
258                if sensory.is_none() || motor.is_none() {
259                    self.fail("registration success missing required endpoints");
260                    return Vec::new();
261                }
262                vec![
263                    SessionAction::SensorConnectTo {
264                        endpoint: sensory.unwrap().clone(),
265                    },
266                    SessionAction::MotorConnectTo {
267                        endpoint: motor.unwrap().clone(),
268                    },
269                ]
270            }
271            RegistrationResponse::FailedInvalidAuth => {
272                self.fail("registration failed: invalid auth");
273                Vec::new()
274            }
275            RegistrationResponse::FailedInvalidRequest => {
276                self.fail("registration failed: invalid request");
277                Vec::new()
278            }
279            RegistrationResponse::AlreadyRegistered => {
280                self.fail("registration failed: already registered");
281                Vec::new()
282            }
283        }
284    }
285
286    fn on_sensor_observed(&mut self, state: FeagiEndpointState) -> Vec<SessionAction> {
287        if self.phase != SessionPhase::DataConnecting {
288            return Vec::new();
289        }
290        if let FeagiEndpointState::Errored(e) = state {
291            self.fail(&format!("sensor errored: {e}"));
292            return Vec::new();
293        }
294        // The driver will send both SensorObserved and MotorObserved; we transition to Active when both are active.
295        Vec::new()
296    }
297
298    fn on_motor_observed(&mut self, state: FeagiEndpointState) -> Vec<SessionAction> {
299        if self.phase != SessionPhase::DataConnecting {
300            return Vec::new();
301        }
302        if let FeagiEndpointState::Errored(e) = state {
303            self.fail(&format!("motor errored: {e}"));
304            return Vec::new();
305        }
306        Vec::new()
307    }
308
309    fn on_deregistered(&mut self, response: DeregistrationResponse) -> Vec<SessionAction> {
310        if self.phase != SessionPhase::Deregistering && self.phase != SessionPhase::Registering {
311            // Server may respond NotRegistered at any time; accept it as terminal.
312        }
313        match response {
314            DeregistrationResponse::Success | DeregistrationResponse::NotRegistered => {
315                self.phase = SessionPhase::Idle;
316                self.session_id = None;
317                self.endpoints = None;
318                self.last_heartbeat_sent_at_ms = None;
319                self.connect_started_at_ms = None;
320            }
321        }
322        Vec::new()
323    }
324
325    fn fail(&mut self, msg: &str) {
326        self.phase = SessionPhase::Failed;
327        self.last_error = Some(msg.to_string());
328    }
329
330    /// Helper for drivers: after polling sensor and motor channels, call this to decide if
331    /// data channels are fully active and transition to Active.
332    pub fn try_mark_data_channels_active(
333        &mut self,
334        sensor_state: &FeagiEndpointState,
335        motor_state: &FeagiEndpointState,
336    ) {
337        if self.phase != SessionPhase::DataConnecting {
338            return;
339        }
340        let sensor_ok = matches!(
341            sensor_state,
342            FeagiEndpointState::ActiveWaiting | FeagiEndpointState::ActiveHasData
343        );
344        let motor_ok = matches!(
345            motor_state,
346            FeagiEndpointState::ActiveWaiting | FeagiEndpointState::ActiveHasData
347        );
348        if sensor_ok && motor_ok {
349            self.phase = SessionPhase::Active;
350        }
351    }
352}
353
354#[allow(clippy::large_enum_variant)]
355#[derive(Debug, Clone)]
356pub enum SessionEvent {
357    ControlObserved {
358        state: FeagiEndpointState,
359        message: Option<FeagiMessage>,
360    },
361    SensorObserved {
362        state: FeagiEndpointState,
363    },
364    MotorObserved {
365        state: FeagiEndpointState,
366    },
367    Deregistered {
368        response: DeregistrationResponse,
369    },
370}
371
372#[derive(Debug, Clone)]
373pub enum SessionAction {
374    ControlRequestConnect,
375    ControlSendRegistration {
376        agent_descriptor: AgentDescriptor,
377        auth_token: AuthToken,
378        requested_capabilities: Vec<AgentCapabilities>,
379    },
380    ControlSendHeartbeat,
381    ControlSendDeregistration {
382        reason: Option<String>,
383    },
384    SensorConnectTo {
385        endpoint: TransportProtocolEndpoint,
386    },
387    MotorConnectTo {
388        endpoint: TransportProtocolEndpoint,
389    },
390}