lambda_simulator/
state.rs1use crate::invocation::{Invocation, InvocationError, InvocationResponse, InvocationStatus};
4use crate::simulator::SimulatorPhase;
5use chrono::{DateTime, Utc};
6use std::collections::{HashMap, VecDeque};
7use std::sync::atomic::{AtomicBool, Ordering};
8use tokio::sync::{Mutex, Notify};
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum RecordResult {
13 Recorded,
15 AlreadyCompleted,
17 NotFound,
19}
20
21#[derive(Debug, Clone)]
23pub struct InvocationState {
24 pub invocation: Invocation,
26
27 pub status: InvocationStatus,
29
30 pub started_at: Option<DateTime<Utc>>,
33
34 pub response: Option<InvocationResponse>,
36
37 pub error: Option<InvocationError>,
39}
40
41#[derive(Debug)]
49pub(crate) struct RuntimeState {
50 pending_invocations: Mutex<VecDeque<Invocation>>,
52
53 invocation_states: Mutex<HashMap<String, InvocationState>>,
55
56 invocation_available: Notify,
58
59 state_changed: Notify,
61
62 phase: Mutex<SimulatorPhase>,
64
65 phase_changed: Notify,
67
68 init_error: Mutex<Option<String>>,
70
71 init_started_at: DateTime<Utc>,
73
74 init_telemetry_emitted: AtomicBool,
76}
77
78impl RuntimeState {
79 pub fn new() -> Self {
81 Self {
82 pending_invocations: Mutex::new(VecDeque::new()),
83 invocation_states: Mutex::new(HashMap::new()),
84 invocation_available: Notify::new(),
85 state_changed: Notify::new(),
86 phase: Mutex::new(SimulatorPhase::Initializing),
87 phase_changed: Notify::new(),
88 init_error: Mutex::new(None),
89 init_started_at: Utc::now(),
90 init_telemetry_emitted: AtomicBool::new(false),
91 }
92 }
93
94 pub fn init_started_at(&self) -> DateTime<Utc> {
96 self.init_started_at
97 }
98
99 pub fn mark_init_telemetry_emitted(&self) -> bool {
104 self.init_telemetry_emitted.swap(true, Ordering::SeqCst)
105 }
106
107 pub(crate) async fn enqueue_invocation(&self, invocation: Invocation) {
113 let request_id = invocation.request_id.clone();
114
115 let state = InvocationState {
116 invocation: invocation.clone(),
117 status: InvocationStatus::Pending,
118 started_at: None,
119 response: None,
120 error: None,
121 };
122
123 self.invocation_states
124 .lock()
125 .await
126 .insert(request_id, state);
127
128 self.pending_invocations.lock().await.push_back(invocation);
129 self.invocation_available.notify_one();
130 }
131
132 pub async fn next_invocation(&self) -> Invocation {
140 loop {
141 {
142 let mut queue = self.pending_invocations.lock().await;
143 if let Some(invocation) = queue.pop_front() {
144 if let Some(state) = self
145 .invocation_states
146 .lock()
147 .await
148 .get_mut(&invocation.request_id)
149 {
150 state.status = InvocationStatus::InProgress;
151 state.started_at = Some(Utc::now());
152 }
153 return invocation;
154 }
155 }
156
157 self.invocation_available.notified().await;
158 }
159 }
160
161 pub async fn record_response(&self, response: InvocationResponse) -> RecordResult {
166 let mut states = self.invocation_states.lock().await;
167 let Some(state) = states.get_mut(&response.request_id) else {
168 return RecordResult::NotFound;
169 };
170
171 if state.status != InvocationStatus::InProgress {
172 return RecordResult::AlreadyCompleted;
173 }
174
175 state.status = InvocationStatus::Success;
176 state.response = Some(response);
177 drop(states);
178 self.state_changed.notify_waiters();
179 RecordResult::Recorded
180 }
181
182 pub async fn record_error(&self, error: InvocationError) -> RecordResult {
187 let mut states = self.invocation_states.lock().await;
188 let Some(state) = states.get_mut(&error.request_id) else {
189 return RecordResult::NotFound;
190 };
191
192 if state.status != InvocationStatus::InProgress {
193 return RecordResult::AlreadyCompleted;
194 }
195
196 state.status = InvocationStatus::Error;
197 state.error = Some(error);
198 drop(states);
199 self.state_changed.notify_waiters();
200 RecordResult::Recorded
201 }
202
203 pub async fn mark_initialized(&self) {
205 *self.phase.lock().await = SimulatorPhase::Ready;
206 self.phase_changed.notify_waiters();
207 }
208
209 pub async fn mark_shutting_down(&self) {
211 *self.phase.lock().await = SimulatorPhase::ShuttingDown;
212 self.phase_changed.notify_waiters();
213 }
214
215 pub async fn is_initialized(&self) -> bool {
217 matches!(
218 *self.phase.lock().await,
219 SimulatorPhase::Ready | SimulatorPhase::ShuttingDown
220 )
221 }
222
223 pub async fn get_phase(&self) -> SimulatorPhase {
225 *self.phase.lock().await
226 }
227
228 pub(crate) async fn wait_for_phase(&self, target_phase: SimulatorPhase) {
234 loop {
235 if *self.phase.lock().await == target_phase {
236 return;
237 }
238 self.phase_changed.notified().await;
239 }
240 }
241
242 pub async fn record_init_error(&self, error: String) {
248 *self.init_error.lock().await = Some(error);
249 }
250
251 pub async fn get_init_error(&self) -> Option<String> {
253 self.init_error.lock().await.clone()
254 }
255
256 pub(crate) async fn wait_for_state_change(&self) {
262 self.state_changed.notified().await;
263 }
264
265 pub async fn get_invocation_state(&self, request_id: &str) -> Option<InvocationState> {
275 self.invocation_states.lock().await.get(request_id).cloned()
276 }
277
278 pub async fn get_all_states(&self) -> Vec<InvocationState> {
280 self.invocation_states
281 .lock()
282 .await
283 .values()
284 .cloned()
285 .collect()
286 }
287}
288
289impl Default for RuntimeState {
290 fn default() -> Self {
291 Self::new()
292 }
293}