lambda_simulator/
state.rs

1//! Shared state management for the Lambda runtime simulator.
2
3use crate::invocation::{Invocation, InvocationError, InvocationResponse, InvocationStatus};
4use crate::simulator::SimulatorPhase;
5use chrono::{DateTime, Utc};
6use std::collections::{HashMap, VecDeque};
7use std::sync::atomic::{AtomicBool, Ordering};
8use tokio::sync::{Mutex, Notify};
9
10/// Result of attempting to record an invocation response or error.
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum RecordResult {
13    /// Successfully recorded.
14    Recorded,
15    /// Invocation was already completed (response or error already submitted).
16    AlreadyCompleted,
17    /// Invocation not found (unknown request ID).
18    NotFound,
19}
20
21/// Tracks the state of a single invocation throughout its lifecycle.
22#[derive(Debug, Clone)]
23pub struct InvocationState {
24    /// The original invocation request data (payload, request ID, deadline, etc.).
25    pub invocation: Invocation,
26
27    /// Current lifecycle status (Pending, InProgress, Success, or Error).
28    pub status: InvocationStatus,
29
30    /// Timestamp when the runtime received this invocation via `/next`.
31    /// `None` if still pending in the queue.
32    pub started_at: Option<DateTime<Utc>>,
33
34    /// The response payload if the invocation completed successfully.
35    pub response: Option<InvocationResponse>,
36
37    /// Error details if the invocation failed.
38    pub error: Option<InvocationError>,
39}
40
41/// Shared state for the runtime simulator.
42///
43/// This holds all the invocations, their states, and provides synchronization
44/// primitives for coordinating between the HTTP handlers and test code.
45///
46/// This type is internal to the simulator and not exposed in the public API.
47/// Users interact with the simulator through the `Simulator` type.
48#[derive(Debug)]
49pub(crate) struct RuntimeState {
50    /// Queue of pending invocations waiting to be processed.
51    pending_invocations: Mutex<VecDeque<Invocation>>,
52
53    /// Map of request IDs to their current state.
54    invocation_states: Mutex<HashMap<String, InvocationState>>,
55
56    /// Notifier for when a new invocation is enqueued.
57    invocation_available: Notify,
58
59    /// Notifier for when invocation state changes.
60    state_changed: Notify,
61
62    /// Current lifecycle phase.
63    phase: Mutex<SimulatorPhase>,
64
65    /// Notifier for when phase changes.
66    phase_changed: Notify,
67
68    /// Whether an initialization error has occurred.
69    init_error: Mutex<Option<String>>,
70
71    /// When the runtime was created (for init duration tracking).
72    init_started_at: DateTime<Utc>,
73
74    /// Whether init telemetry has already been emitted.
75    init_telemetry_emitted: AtomicBool,
76}
77
78impl RuntimeState {
79    /// Creates a new runtime state.
80    pub fn new() -> Self {
81        Self {
82            pending_invocations: Mutex::new(VecDeque::new()),
83            invocation_states: Mutex::new(HashMap::new()),
84            invocation_available: Notify::new(),
85            state_changed: Notify::new(),
86            phase: Mutex::new(SimulatorPhase::Initializing),
87            phase_changed: Notify::new(),
88            init_error: Mutex::new(None),
89            init_started_at: Utc::now(),
90            init_telemetry_emitted: AtomicBool::new(false),
91        }
92    }
93
94    /// Gets when the runtime state was created (init start time).
95    pub fn init_started_at(&self) -> DateTime<Utc> {
96        self.init_started_at
97    }
98
99    /// Marks init telemetry as emitted and returns whether it was already emitted.
100    ///
101    /// Returns `false` if this is the first call (init telemetry should be emitted),
102    /// or `true` if it was already emitted (skip emission).
103    pub fn mark_init_telemetry_emitted(&self) -> bool {
104        self.init_telemetry_emitted.swap(true, Ordering::SeqCst)
105    }
106
107    /// Enqueues a new invocation.
108    ///
109    /// # Arguments
110    ///
111    /// * `invocation` - The invocation to enqueue
112    pub(crate) async fn enqueue_invocation(&self, invocation: Invocation) {
113        let request_id = invocation.request_id.clone();
114
115        let state = InvocationState {
116            invocation: invocation.clone(),
117            status: InvocationStatus::Pending,
118            started_at: None,
119            response: None,
120            error: None,
121        };
122
123        self.invocation_states
124            .lock()
125            .await
126            .insert(request_id, state);
127
128        self.pending_invocations.lock().await.push_back(invocation);
129        self.invocation_available.notify_one();
130    }
131
132    /// Waits for and dequeues the next invocation.
133    ///
134    /// This will block until an invocation is available.
135    ///
136    /// # Returns
137    ///
138    /// The next invocation to process.
139    pub async fn next_invocation(&self) -> Invocation {
140        loop {
141            {
142                let mut queue = self.pending_invocations.lock().await;
143                if let Some(invocation) = queue.pop_front() {
144                    if let Some(state) = self
145                        .invocation_states
146                        .lock()
147                        .await
148                        .get_mut(&invocation.request_id)
149                    {
150                        state.status = InvocationStatus::InProgress;
151                        state.started_at = Some(Utc::now());
152                    }
153                    return invocation;
154                }
155            }
156
157            self.invocation_available.notified().await;
158        }
159    }
160
161    /// Records a successful invocation response.
162    ///
163    /// Only records if the invocation is still in `InProgress` status.
164    /// This implements "first wins" semantics - subsequent responses are ignored.
165    pub async fn record_response(&self, response: InvocationResponse) -> RecordResult {
166        let mut states = self.invocation_states.lock().await;
167        let Some(state) = states.get_mut(&response.request_id) else {
168            return RecordResult::NotFound;
169        };
170
171        if state.status != InvocationStatus::InProgress {
172            return RecordResult::AlreadyCompleted;
173        }
174
175        state.status = InvocationStatus::Success;
176        state.response = Some(response);
177        drop(states);
178        self.state_changed.notify_waiters();
179        RecordResult::Recorded
180    }
181
182    /// Records an invocation error.
183    ///
184    /// Only records if the invocation is still in `InProgress` status.
185    /// This implements "first wins" semantics - subsequent errors are ignored.
186    pub async fn record_error(&self, error: InvocationError) -> RecordResult {
187        let mut states = self.invocation_states.lock().await;
188        let Some(state) = states.get_mut(&error.request_id) else {
189            return RecordResult::NotFound;
190        };
191
192        if state.status != InvocationStatus::InProgress {
193            return RecordResult::AlreadyCompleted;
194        }
195
196        state.status = InvocationStatus::Error;
197        state.error = Some(error);
198        drop(states);
199        self.state_changed.notify_waiters();
200        RecordResult::Recorded
201    }
202
203    /// Marks the runtime as initialized and transitions to Ready phase.
204    pub async fn mark_initialized(&self) {
205        *self.phase.lock().await = SimulatorPhase::Ready;
206        self.phase_changed.notify_waiters();
207    }
208
209    /// Marks the runtime as shutting down.
210    pub async fn mark_shutting_down(&self) {
211        *self.phase.lock().await = SimulatorPhase::ShuttingDown;
212        self.phase_changed.notify_waiters();
213    }
214
215    /// Checks if the runtime has been initialized.
216    pub async fn is_initialized(&self) -> bool {
217        matches!(
218            *self.phase.lock().await,
219            SimulatorPhase::Ready | SimulatorPhase::ShuttingDown
220        )
221    }
222
223    /// Gets the current lifecycle phase.
224    pub async fn get_phase(&self) -> SimulatorPhase {
225        *self.phase.lock().await
226    }
227
228    /// Waits for the simulator to reach a specific phase.
229    ///
230    /// # Arguments
231    ///
232    /// * `target_phase` - The phase to wait for
233    pub(crate) async fn wait_for_phase(&self, target_phase: SimulatorPhase) {
234        loop {
235            if *self.phase.lock().await == target_phase {
236                return;
237            }
238            self.phase_changed.notified().await;
239        }
240    }
241
242    /// Records an initialization error.
243    ///
244    /// # Arguments
245    ///
246    /// * `error` - The error message
247    pub async fn record_init_error(&self, error: String) {
248        *self.init_error.lock().await = Some(error);
249    }
250
251    /// Gets the initialization error if one occurred.
252    pub async fn get_init_error(&self) -> Option<String> {
253        self.init_error.lock().await.clone()
254    }
255
256    /// Waits for an invocation state change notification.
257    ///
258    /// This method blocks until any invocation state changes (response, error, or timeout).
259    /// It's used internally by wait helpers to efficiently wait for state transitions
260    /// without polling.
261    pub(crate) async fn wait_for_state_change(&self) {
262        self.state_changed.notified().await;
263    }
264
265    /// Gets the state of an invocation by request ID.
266    ///
267    /// # Arguments
268    ///
269    /// * `request_id` - The request ID to look up
270    ///
271    /// # Returns
272    ///
273    /// The invocation state if found.
274    pub async fn get_invocation_state(&self, request_id: &str) -> Option<InvocationState> {
275        self.invocation_states.lock().await.get(request_id).cloned()
276    }
277
278    /// Gets all invocation states.
279    pub async fn get_all_states(&self) -> Vec<InvocationState> {
280        self.invocation_states
281            .lock()
282            .await
283            .values()
284            .cloned()
285            .collect()
286    }
287}
288
289impl Default for RuntimeState {
290    fn default() -> Self {
291        Self::new()
292    }
293}