resonate-sdk 0.4.0

Resonate SDK for Rust
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
use parking_lot::Mutex;
use std::collections::HashMap;
use std::sync::Arc;

use crate::send::{Sender, TaskRef};

/// Heartbeat trait for keeping task leases alive.
///
/// Implementations track a set of active tasks and periodically
/// send heartbeat requests to the server for all of them.
pub trait Heartbeat: Send + Sync {
    /// Add a task to the heartbeat set. Starts the heartbeat loop
    /// if this is the first tracked task.
    fn start(&self, task_id: &str, task_version: i64);

    /// Remove a task from the heartbeat set. Stops the heartbeat loop
    /// if no tasks remain.
    fn stop(&self, task_id: &str);

    /// Shut down the heartbeat entirely, clearing all tracked tasks
    /// and aborting the loop. Called on graceful shutdown.
    fn shutdown(&self);
}

/// No-op heartbeat for local mode.
pub struct NoopHeartbeat;

impl Heartbeat for NoopHeartbeat {
    fn start(&self, _task_id: &str, _task_version: i64) {}
    fn stop(&self, _task_id: &str) {}
    fn shutdown(&self) {}
}

/// Async heartbeat that sends task.heartbeat requests at regular intervals
/// for all currently tracked tasks.
///
/// Uses `Sender` (not raw `Transport`) so the request goes through the
/// standard protocol envelope with corrId, version header, etc.
pub struct AsyncHeartbeat {
    pid: String,
    interval_ms: u64,
    sender: Sender,
    active_tasks: Arc<Mutex<HashMap<String, i64>>>,
    handle: Mutex<Option<tokio::task::JoinHandle<()>>>,
}

impl AsyncHeartbeat {
    pub fn new(pid: String, interval_ms: u64, sender: Sender) -> Self {
        Self {
            pid,
            interval_ms,
            sender,
            active_tasks: Arc::new(Mutex::new(HashMap::new())),
            handle: Mutex::new(None),
        }
    }

    /// Returns the number of currently tracked tasks.
    pub fn task_count(&self) -> usize {
        self.active_tasks.lock().len()
    }

    /// Returns whether the heartbeat loop is currently running.
    pub fn is_running(&self) -> bool {
        self.handle.lock().is_some()
    }

    /// Returns a snapshot of the currently tracked tasks (id → version).
    pub fn tracked_tasks(&self) -> HashMap<String, i64> {
        self.active_tasks.lock().clone()
    }

    /// Spawn the heartbeat loop if not already running.
    fn ensure_loop_running(&self) {
        let mut handle = self.handle.lock();
        if handle.is_some() {
            return;
        }

        let pid = self.pid.clone();
        let interval = std::time::Duration::from_millis(self.interval_ms);
        let sender = self.sender.clone();
        let active_tasks = self.active_tasks.clone();

        *handle = Some(tokio::spawn(async move {
            let mut ticker = tokio::time::interval(interval);
            loop {
                ticker.tick().await;

                // Snapshot active tasks under lock
                let tasks: Vec<TaskRef> = {
                    let map = active_tasks.lock();
                    map.iter()
                        .map(|(id, version)| TaskRef {
                            id: id.clone(),
                            version: *version,
                        })
                        .collect()
                };

                if tasks.is_empty() {
                    continue;
                }

                if let Err(e) = sender.task_heartbeat(&pid, tasks).await {
                    tracing::warn!(error = %e, "heartbeat failed");
                }
            }
        }));
    }
}

impl Heartbeat for AsyncHeartbeat {
    fn start(&self, task_id: &str, task_version: i64) {
        {
            let mut map = self.active_tasks.lock();
            map.insert(task_id.to_string(), task_version);
        }
        self.ensure_loop_running();
    }

    fn stop(&self, task_id: &str) {
        let is_empty = {
            let mut map = self.active_tasks.lock();
            map.remove(task_id);
            map.is_empty()
        };

        if is_empty {
            let mut handle = self.handle.lock();
            if let Some(h) = handle.take() {
                h.abort();
            }
        }
    }

    fn shutdown(&self) {
        self.active_tasks.lock().clear();
        let mut handle = self.handle.lock();
        if let Some(h) = handle.take() {
            h.abort();
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::test_utils::TestHarness;

    fn test_heartbeat(sender: Sender) -> AsyncHeartbeat {
        AsyncHeartbeat::new("test-pid".to_string(), 50, sender)
    }

    // ── Task tracking ──────────────────────────────────────────────

    #[tokio::test]
    async fn start_adds_task_to_tracked_set() {
        let harness = TestHarness::new();
        let hb = test_heartbeat(harness.build_sender());

        hb.start("task-1", 1);
        assert_eq!(hb.task_count(), 1);

        let tasks = hb.tracked_tasks();
        assert_eq!(tasks.get("task-1"), Some(&1));

        hb.shutdown();
    }

    #[tokio::test]
    async fn start_multiple_tasks_all_tracked() {
        let harness = TestHarness::new();
        let hb = test_heartbeat(harness.build_sender());

        hb.start("task-1", 1);
        hb.start("task-2", 5);
        hb.start("task-3", 10);
        assert_eq!(hb.task_count(), 3);

        let tasks = hb.tracked_tasks();
        assert_eq!(tasks.get("task-1"), Some(&1));
        assert_eq!(tasks.get("task-2"), Some(&5));
        assert_eq!(tasks.get("task-3"), Some(&10));

        hb.shutdown();
    }

    #[tokio::test]
    async fn stop_removes_task_from_tracked_set() {
        let harness = TestHarness::new();
        let hb = test_heartbeat(harness.build_sender());

        hb.start("task-1", 1);
        hb.start("task-2", 2);
        assert_eq!(hb.task_count(), 2);

        hb.stop("task-1");
        assert_eq!(hb.task_count(), 1);
        assert!(!hb.tracked_tasks().contains_key("task-1"));
        assert_eq!(hb.tracked_tasks().get("task-2"), Some(&2));

        hb.shutdown();
    }

    #[tokio::test]
    async fn stop_nonexistent_task_is_harmless() {
        let harness = TestHarness::new();
        let hb = test_heartbeat(harness.build_sender());

        hb.start("task-1", 1);
        hb.stop("nonexistent");
        assert_eq!(hb.task_count(), 1);

        hb.shutdown();
    }

    #[tokio::test]
    async fn start_same_task_twice_updates_version() {
        let harness = TestHarness::new();
        let hb = test_heartbeat(harness.build_sender());

        hb.start("task-1", 1);
        hb.start("task-1", 5);
        assert_eq!(hb.task_count(), 1);
        assert_eq!(hb.tracked_tasks().get("task-1"), Some(&5));

        hb.shutdown();
    }

    // ── Loop lifecycle ─────────────────────────────────────────────

    #[tokio::test]
    async fn loop_not_running_initially() {
        let harness = TestHarness::new();
        let hb = test_heartbeat(harness.build_sender());

        assert!(!hb.is_running());
    }

    #[tokio::test]
    async fn loop_starts_on_first_task() {
        let harness = TestHarness::new();
        let hb = test_heartbeat(harness.build_sender());

        hb.start("task-1", 1);
        assert!(hb.is_running());

        hb.shutdown();
    }

    #[tokio::test]
    async fn loop_stays_running_while_tasks_remain() {
        let harness = TestHarness::new();
        let hb = test_heartbeat(harness.build_sender());

        hb.start("task-1", 1);
        hb.start("task-2", 2);
        assert!(hb.is_running());

        hb.stop("task-1");
        assert!(
            hb.is_running(),
            "loop should stay running with task-2 still active"
        );

        hb.shutdown();
    }

    #[tokio::test]
    async fn loop_stops_when_last_task_removed() {
        let harness = TestHarness::new();
        let hb = test_heartbeat(harness.build_sender());

        hb.start("task-1", 1);
        assert!(hb.is_running());

        hb.stop("task-1");
        assert!(!hb.is_running());
    }

    #[tokio::test]
    async fn loop_stops_when_last_of_multiple_tasks_removed() {
        let harness = TestHarness::new();
        let hb = test_heartbeat(harness.build_sender());

        hb.start("task-1", 1);
        hb.start("task-2", 2);
        hb.stop("task-1");
        assert!(hb.is_running());

        hb.stop("task-2");
        assert!(!hb.is_running());
    }

    #[tokio::test]
    async fn loop_restarts_after_stop_and_new_start() {
        let harness = TestHarness::new();
        let hb = test_heartbeat(harness.build_sender());

        hb.start("task-1", 1);
        assert!(hb.is_running());

        hb.stop("task-1");
        assert!(!hb.is_running());

        hb.start("task-2", 2);
        assert!(hb.is_running());

        hb.shutdown();
    }

    // ── Shutdown ───────────────────────────────────────────────────

    #[tokio::test]
    async fn shutdown_clears_all_tasks_and_stops_loop() {
        let harness = TestHarness::new();
        let hb = test_heartbeat(harness.build_sender());

        hb.start("task-1", 1);
        hb.start("task-2", 2);
        hb.start("task-3", 3);
        assert_eq!(hb.task_count(), 3);
        assert!(hb.is_running());

        hb.shutdown();
        assert_eq!(hb.task_count(), 0);
        assert!(!hb.is_running());
    }

    #[tokio::test]
    async fn shutdown_when_already_idle_is_harmless() {
        let harness = TestHarness::new();
        let hb = test_heartbeat(harness.build_sender());

        hb.shutdown();
        assert_eq!(hb.task_count(), 0);
        assert!(!hb.is_running());
    }

    // ── Heartbeat sends ────────────────────────────────────────────

    #[tokio::test]
    async fn heartbeat_sends_request_with_tracked_tasks() {
        let harness = TestHarness::new();
        let hb = test_heartbeat(harness.build_sender());

        hb.start("task-1", 1);
        hb.start("task-2", 5);

        // Wait for at least one heartbeat tick
        tokio::time::sleep(std::time::Duration::from_millis(120)).await;

        hb.shutdown();

        let requests = harness.sent_requests_json().await;
        let heartbeats: Vec<_> = requests
            .iter()
            .filter(|r| r["kind"] == "task.heartbeat")
            .collect();

        assert!(
            !heartbeats.is_empty(),
            "should have sent at least one heartbeat"
        );

        // Check the last heartbeat contains both tasks
        let last_hb = heartbeats.last().unwrap();
        assert_eq!(last_hb["pid"], "test-pid");

        let tasks = last_hb["tasks"].as_array().unwrap();
        assert_eq!(tasks.len(), 2);

        let ids: Vec<&str> = tasks.iter().map(|t| t["id"].as_str().unwrap()).collect();
        assert!(ids.contains(&"task-1"));
        assert!(ids.contains(&"task-2"));
    }

    #[tokio::test]
    async fn heartbeat_reflects_task_removal() {
        let harness = TestHarness::new();
        let hb = test_heartbeat(harness.build_sender());

        hb.start("task-1", 1);
        hb.start("task-2", 2);

        // Wait for a heartbeat with both tasks
        tokio::time::sleep(std::time::Duration::from_millis(80)).await;

        // Remove task-1
        hb.stop("task-1");

        // Wait for another heartbeat with only task-2
        tokio::time::sleep(std::time::Duration::from_millis(80)).await;

        hb.shutdown();

        let requests = harness.sent_requests_json().await;
        let heartbeats: Vec<_> = requests
            .iter()
            .filter(|r| r["kind"] == "task.heartbeat")
            .collect();

        // The last heartbeat should only contain task-2
        let last_hb = heartbeats.last().unwrap();
        let tasks = last_hb["tasks"].as_array().unwrap();
        assert_eq!(tasks.len(), 1);
        assert_eq!(tasks[0]["id"], "task-2");
    }

    // ── NoopHeartbeat ──────────────────────────────────────────────

    #[test]
    fn noop_heartbeat_start_stop_shutdown_are_harmless() {
        let hb = NoopHeartbeat;
        hb.start("task-1", 1);
        hb.start("task-2", 2);
        hb.stop("task-1");
        hb.stop("nonexistent");
        hb.shutdown();
    }
}