lambda-simulator 0.1.5

High-fidelity AWS Lambda Runtime API simulator for testing Lambda runtimes and extensions locally
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
//! Extension readiness tracking for Lambda lifecycle coordination.
//!
//! This module tracks when extensions have completed their post-invocation work
//! and are ready for the next invocation. This is critical for accurate lifecycle
//! simulation because Lambda waits for all extensions to be ready before:
//! - Emitting `platform.report` telemetry
//! - Freezing the process
//!
//! The "extension overhead" time is the duration between when the runtime completes
//! its response and when all extensions signal they are ready by polling `/next`.

use chrono::{DateTime, Utc};
use std::collections::{HashMap, HashSet};
use tokio::sync::{Mutex, Notify};

/// Tracks extension readiness for a specific invocation.
#[derive(Debug)]
struct InvocationReadiness {
    /// Extensions that are subscribed to INVOKE events for this invocation.
    expected_extensions: HashSet<String>,

    /// Extensions that have signalled readiness by polling /next.
    ready_extensions: HashSet<String>,

    /// Extensions that have called /next but before runtime was done.
    /// These will be promoted to ready_extensions when runtime finishes.
    pending_ready: HashSet<String>,

    /// When the runtime completed its response.
    runtime_done_at: Option<DateTime<Utc>>,

    /// When all extensions became ready.
    extensions_ready_at: Option<DateTime<Utc>>,
}

impl InvocationReadiness {
    fn new(expected_extensions: HashSet<String>) -> Self {
        Self {
            expected_extensions,
            ready_extensions: HashSet::new(),
            pending_ready: HashSet::new(),
            runtime_done_at: None,
            extensions_ready_at: None,
        }
    }

    fn all_ready(&self) -> bool {
        self.expected_extensions.is_subset(&self.ready_extensions)
    }

    fn extension_overhead_ms(&self) -> Option<f64> {
        match (self.runtime_done_at, self.extensions_ready_at) {
            (Some(done), Some(ready)) => Some((ready - done).num_milliseconds() as f64),
            _ => None,
        }
    }
}

/// Tracks extension readiness across all invocations.
///
/// This tracker coordinates between the runtime API (which knows when invocations
/// complete) and the extensions API (which knows when extensions poll /next).
///
/// # Lifecycle Flow
///
/// 1. When an invocation is enqueued, `start_invocation()` is called with the
///    list of extension IDs that subscribed to INVOKE events.
/// 2. When the runtime posts its response, `mark_runtime_done()` is called.
/// 3. Each extension has an "overhead window" to do post-invocation work.
/// 4. When an extension polls `/next`, `mark_extension_ready()` is called.
/// 5. When all extensions are ready, `wait_for_all_ready()` returns.
///
/// If no extensions are registered for INVOKE events, the invocation is
/// immediately considered ready.
#[derive(Debug)]
pub struct ExtensionReadinessTracker {
    /// Per-invocation readiness tracking.
    invocations: Mutex<HashMap<String, InvocationReadiness>>,

    /// Notifier for readiness state changes.
    readiness_changed: Notify,

    /// The currently in-progress request ID.
    /// Set when invocation starts, used to track extensions that call /next before runtime done.
    current_request: Mutex<Option<String>>,

    /// The most recently completed request ID.
    /// Used to determine if an extension polling /next is signalling readiness
    /// for a completed invocation or waiting for a new one.
    last_completed_request: Mutex<Option<String>>,
}

impl ExtensionReadinessTracker {
    /// Creates a new extension readiness tracker.
    pub fn new() -> Self {
        Self {
            invocations: Mutex::new(HashMap::new()),
            readiness_changed: Notify::new(),
            current_request: Mutex::new(None),
            last_completed_request: Mutex::new(None),
        }
    }

    /// Starts tracking a new invocation.
    ///
    /// # Arguments
    ///
    /// * `request_id` - The request ID of the invocation
    /// * `invoke_extension_ids` - Extension IDs that subscribed to INVOKE events
    pub async fn start_invocation(&self, request_id: &str, invoke_extension_ids: Vec<String>) {
        let expected: HashSet<String> = invoke_extension_ids.into_iter().collect();
        let readiness = InvocationReadiness::new(expected);

        self.invocations
            .lock()
            .await
            .insert(request_id.to_string(), readiness);

        *self.current_request.lock().await = Some(request_id.to_string());
    }

    /// Marks the runtime as done processing an invocation.
    ///
    /// This is called when the runtime posts its response or error. After this,
    /// the "extension overhead" window begins.
    ///
    /// # Arguments
    ///
    /// * `request_id` - The request ID of the completed invocation
    pub async fn mark_runtime_done(&self, request_id: &str) {
        let mut invocations = self.invocations.lock().await;
        if let Some(readiness) = invocations.get_mut(request_id) {
            readiness.runtime_done_at = Some(Utc::now());

            // Promote any extensions that called /next before runtime was done
            for ext in readiness.pending_ready.drain() {
                readiness.ready_extensions.insert(ext);
            }

            if readiness.all_ready() {
                readiness.extensions_ready_at = Some(Utc::now());
                self.readiness_changed.notify_waiters();
            }
        }

        *self.last_completed_request.lock().await = Some(request_id.to_string());
        self.readiness_changed.notify_waiters();
    }

    /// Marks an extension as ready for the next invocation.
    ///
    /// This is called when an extension polls `/next` after the runtime has
    /// completed its response. It signals that the extension has finished its
    /// post-invocation work.
    ///
    /// # Arguments
    ///
    /// * `extension_id` - The ID of the extension signalling readiness
    ///
    /// # Returns
    ///
    /// `true` if the extension was marked ready for a pending invocation,
    /// `false` if there was no pending invocation to mark ready for.
    pub async fn mark_extension_ready(&self, extension_id: &str) -> bool {
        let current_request = self.current_request.lock().await.clone();
        let last_request = self.last_completed_request.lock().await.clone();

        let mut invocations = self.invocations.lock().await;

        // First, check if there's a completed request waiting for readiness
        if let Some(ref request_id) = last_request
            && let Some(readiness) = invocations.get_mut(request_id)
            && readiness.runtime_done_at.is_some()
            && !readiness.all_ready()
        {
            readiness.ready_extensions.insert(extension_id.to_string());

            if readiness.all_ready() {
                readiness.extensions_ready_at = Some(Utc::now());
                self.readiness_changed.notify_waiters();
            }
            return true;
        }

        // If no completed request or already ready, check if there's a current
        // in-progress request where we should track this as pending ready
        if let Some(ref request_id) = current_request
            && let Some(readiness) = invocations.get_mut(request_id)
            && readiness.runtime_done_at.is_none()
            && readiness.expected_extensions.contains(extension_id)
        {
            // Runtime not done yet, but extension called /next - mark as pending
            readiness.pending_ready.insert(extension_id.to_string());
            return true;
        }

        false
    }

    /// Checks if all extensions are ready for a specific invocation.
    ///
    /// # Arguments
    ///
    /// * `request_id` - The request ID to check
    ///
    /// # Returns
    ///
    /// `true` if all expected extensions have signalled readiness, or if no
    /// extensions were subscribed to INVOKE events.
    pub async fn is_all_ready(&self, request_id: &str) -> bool {
        let invocations = self.invocations.lock().await;
        invocations.get(request_id).is_none_or(|r| r.all_ready())
    }

    /// Waits for all extensions to be ready for a specific invocation.
    ///
    /// This method blocks until all extensions that subscribed to INVOKE events
    /// have polled `/next`, signalling they are ready for the next invocation.
    ///
    /// # Arguments
    ///
    /// * `request_id` - The request ID to wait for
    pub async fn wait_for_all_ready(&self, request_id: &str) {
        loop {
            if self.is_all_ready(request_id).await {
                return;
            }
            self.readiness_changed.notified().await;
        }
    }

    /// Gets the extension overhead time for an invocation.
    ///
    /// The overhead is the time between when the runtime completed its response
    /// and when all extensions signalled readiness.
    ///
    /// # Arguments
    ///
    /// * `request_id` - The request ID to get overhead for
    ///
    /// # Returns
    ///
    /// The overhead in milliseconds, or `None` if the invocation is not complete
    /// or extensions are not yet ready.
    pub async fn get_extension_overhead_ms(&self, request_id: &str) -> Option<f64> {
        let invocations = self.invocations.lock().await;
        invocations
            .get(request_id)
            .and_then(|r| r.extension_overhead_ms())
    }

    /// Gets the list of extensions that have signalled readiness for an invocation.
    ///
    /// # Arguments
    ///
    /// * `request_id` - The request ID to check
    #[allow(dead_code)]
    pub async fn get_ready_extensions(&self, request_id: &str) -> Vec<String> {
        let invocations = self.invocations.lock().await;
        invocations
            .get(request_id)
            .map(|r| r.ready_extensions.iter().cloned().collect())
            .unwrap_or_default()
    }

    /// Gets the list of extensions still pending readiness for an invocation.
    ///
    /// # Arguments
    ///
    /// * `request_id` - The request ID to check
    #[allow(dead_code)]
    pub async fn get_pending_extensions(&self, request_id: &str) -> Vec<String> {
        let invocations = self.invocations.lock().await;
        invocations
            .get(request_id)
            .map(|r| {
                r.expected_extensions
                    .difference(&r.ready_extensions)
                    .cloned()
                    .collect()
            })
            .unwrap_or_default()
    }

    /// Cleans up completed invocation tracking data.
    ///
    /// Call this after an invocation is fully complete to free memory.
    ///
    /// # Arguments
    ///
    /// * `request_id` - The request ID to clean up
    pub async fn cleanup_invocation(&self, request_id: &str) {
        self.invocations.lock().await.remove(request_id);
    }

    /// Notifies waiters that readiness state has changed.
    ///
    /// This is used internally when state changes occur.
    #[allow(dead_code)]
    pub(crate) fn notify_readiness_changed(&self) {
        self.readiness_changed.notify_waiters();
    }
}

impl Default for ExtensionReadinessTracker {
    fn default() -> Self {
        Self::new()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_no_extensions_is_immediately_ready() {
        let tracker = ExtensionReadinessTracker::new();

        tracker.start_invocation("req-1", vec![]).await;
        tracker.mark_runtime_done("req-1").await;

        assert!(tracker.is_all_ready("req-1").await);
    }

    #[tokio::test]
    async fn test_wait_for_all_ready_returns_immediately_with_no_extensions() {
        let tracker = ExtensionReadinessTracker::new();

        tracker.start_invocation("req-1", vec![]).await;
        tracker.mark_runtime_done("req-1").await;

        let result = tokio::time::timeout(std::time::Duration::from_millis(100), async {
            tracker.wait_for_all_ready("req-1").await;
        })
        .await;

        assert!(
            result.is_ok(),
            "wait_for_all_ready should return immediately with no extensions"
        );
    }

    #[tokio::test]
    async fn test_single_extension_readiness() {
        let tracker = ExtensionReadinessTracker::new();

        tracker
            .start_invocation("req-1", vec!["ext-1".to_string()])
            .await;
        tracker.mark_runtime_done("req-1").await;

        assert!(!tracker.is_all_ready("req-1").await);

        tracker.mark_extension_ready("ext-1").await;

        assert!(tracker.is_all_ready("req-1").await);
    }

    #[tokio::test]
    async fn test_multiple_extensions_readiness() {
        let tracker = ExtensionReadinessTracker::new();

        tracker
            .start_invocation(
                "req-1",
                vec![
                    "ext-1".to_string(),
                    "ext-2".to_string(),
                    "ext-3".to_string(),
                ],
            )
            .await;
        tracker.mark_runtime_done("req-1").await;

        assert!(!tracker.is_all_ready("req-1").await);

        tracker.mark_extension_ready("ext-1").await;
        assert!(!tracker.is_all_ready("req-1").await);

        tracker.mark_extension_ready("ext-2").await;
        assert!(!tracker.is_all_ready("req-1").await);

        tracker.mark_extension_ready("ext-3").await;
        assert!(tracker.is_all_ready("req-1").await);
    }

    #[tokio::test]
    async fn test_pending_extensions() {
        let tracker = ExtensionReadinessTracker::new();

        tracker
            .start_invocation("req-1", vec!["ext-1".to_string(), "ext-2".to_string()])
            .await;
        tracker.mark_runtime_done("req-1").await;

        let pending = tracker.get_pending_extensions("req-1").await;
        assert_eq!(pending.len(), 2);
        assert!(pending.contains(&"ext-1".to_string()));
        assert!(pending.contains(&"ext-2".to_string()));

        tracker.mark_extension_ready("ext-1").await;

        let pending = tracker.get_pending_extensions("req-1").await;
        assert_eq!(pending.len(), 1);
        assert!(pending.contains(&"ext-2".to_string()));

        let ready = tracker.get_ready_extensions("req-1").await;
        assert_eq!(ready.len(), 1);
        assert!(ready.contains(&"ext-1".to_string()));
    }

    #[tokio::test]
    async fn test_extension_overhead_calculation() {
        let tracker = ExtensionReadinessTracker::new();

        tracker
            .start_invocation("req-1", vec!["ext-1".to_string()])
            .await;
        tracker.mark_runtime_done("req-1").await;

        assert!(tracker.get_extension_overhead_ms("req-1").await.is_none());

        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;

        tracker.mark_extension_ready("ext-1").await;

        let overhead = tracker.get_extension_overhead_ms("req-1").await;
        assert!(overhead.is_some());
        assert!(overhead.unwrap() >= 10.0);
    }

    #[tokio::test]
    async fn test_cleanup_invocation() {
        let tracker = ExtensionReadinessTracker::new();

        tracker
            .start_invocation("req-1", vec!["ext-1".to_string()])
            .await;

        assert!(!tracker.is_all_ready("req-1").await);

        tracker.cleanup_invocation("req-1").await;

        assert!(tracker.is_all_ready("req-1").await);
    }

    #[tokio::test]
    async fn test_unknown_request_is_ready() {
        let tracker = ExtensionReadinessTracker::new();

        assert!(tracker.is_all_ready("nonexistent").await);
    }
}