pg_lease 0.1.3

Postgres lease management package for running a single looping function while a lease is held
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
use std::{error::Error, future::Future, sync::Arc, time::Duration};

use chrono::{DateTime, Utc};
use rand::Rng;
use sqlx::{Pool, Postgres};
use tokio::{sync::Mutex, task::AbortHandle};

pub struct LeaseContext {
    pub lease_name: String,
    pub worker_id: String,
}

/// A lease-based looper that ensures only one worker can execute a task at a time
/// across multiple processes/machines using PostgreSQL as the coordination mechanism.
///
/// The looper function receives shared state of type `S`, allowing you to pass
/// application-specific data (similar to Axum's state handling).
///
/// # Example
///
/// ```ignore
/// use std::sync::Arc;
/// use tokio::sync::Mutex;
/// use pg_lease::{LeaseLooper, LeaseLooperOptions};
///
/// #[derive(Clone)]
/// struct AppState {
///     counter: Arc<Mutex<i32>>,
///     config: String,
/// }
///
/// let state = AppState {
///     counter: Arc::new(Mutex::new(0)),
///     config: "production".to_string(),
/// };
///
/// let looper_func = |state: AppState| async move {
///     let mut counter = state.counter.lock().await;
///     *counter += 1;
///     println!("Counter: {}, Config: {}", *counter, state.config);
///     Ok(())
/// };
///
/// let looper = LeaseLooper::new(
///     "my-task".to_string(),
///     looper_func,
///     "worker-1".to_string(),
///     pool,
///     LeaseLooperOptions::default(),
///     state,
/// );
/// ```
pub struct LeaseLooper<T, Fut, S>
where
    T: Fn(LeaseContext, S) -> Fut + Send + Sync + Clone + 'static,
    Fut: Future<Output = Result<(), Box<dyn Error + Send + Sync>>> + Send + 'static,
    S: Clone + Send + Sync + 'static,
{
    lease_name: String,
    worker_id: String,
    looper_func: T,
    state: S,
    options: LeaseLooperOptions,
    pool: Pool<Postgres>,

    abort_handle: Arc<Mutex<Option<AbortHandle>>>,
}

#[derive(Debug, Clone, Copy, Default)]
pub struct LeaseLooperOptions {
    pub loop_interval: Duration,
    pub loop_interval_jitter: Duration,
    pub lease_duration: Duration,
    pub lease_heartbeat_interval: Duration,
}

impl<T, Fut, S> LeaseLooper<T, Fut, S>
where
    T: Fn(LeaseContext, S) -> Fut + Send + Sync + Clone + 'static,
    Fut: Future<Output = Result<(), Box<dyn Error + Send + Sync>>> + Send + 'static,
    S: Clone + Send + Sync + 'static,
{
    pub fn new(
        lease_name: String,
        looper_func: T,
        worker_id: String,
        pool: Pool<Postgres>,
        options: LeaseLooperOptions,
        state: S,
    ) -> Self {
        Self {
            lease_name,
            worker_id,
            looper_func,
            state,
            options,
            pool,
            abort_handle: Arc::new(Mutex::new(None)),
        }
    }

    /// Starts the lease looper, which will poll for a lease and execute the looper function when the lease is acquired.
    pub async fn start(&self) -> Result<(), Box<dyn Error + Send + Sync>> {
        let mut abort_handle = self.abort_handle.lock().await;

        let join_handle = tokio::task::spawn(Self::launch_looper(
            self.looper_func.clone(),
            self.pool.clone(),
            self.options,
            self.lease_name.clone(),
            self.worker_id.clone(),
            self.state.clone(),
        ));

        *abort_handle = Some(join_handle.abort_handle());

        Ok(())
    }

    async fn launch_looper(
        looper_func: T,
        pool: Pool<Postgres>,
        options: LeaseLooperOptions,
        lease_name: String,
        worker_id: String,
        state: S,
    ) -> Result<(), Box<dyn Error + Send + Sync>> {
        println!("launching looper, attempting to create table {}", worker_id);

        if let Err(e) = sqlx::query(
            r#"
            create table if not exists _pg_lease (
                name text,
                worker_id text,
                held_until timestamptz,
                primary key (name)
            )"#,
        )
        .execute(&pool)
        .await
        {
            eprintln!("Failed to create _pg_lease table for {}: {}", worker_id, e);
            return Err(format!("Failed to create _pg_lease table: {}", e).into());
        }

        println!("table created, attempting to acquire lease: {}", worker_id);

        loop {
            if let Err(e) =
                Self::wait_for_lease(&pool, &lease_name, worker_id.as_str(), &options).await
            {
                eprintln!("Error waiting for lease {}: {}", worker_id, e);
                tokio::time::sleep(options.loop_interval).await;
                continue;
            }

            let looper_handle = tokio::task::spawn(looper_func(
                LeaseContext {
                    lease_name: lease_name.clone(),
                    worker_id: worker_id.clone(),
                },
                state.clone(),
            ));

            let heartbeat_handle = tokio::task::spawn(Self::heartbeat_loop(
                looper_handle.abort_handle(),
                pool.clone(),
                lease_name.clone(),
                worker_id.clone(),
                options,
            ));

            let looper_join_result = looper_handle.await;

            heartbeat_handle.abort();

            match looper_join_result {
                Ok(Ok(())) => {
                    // looper_func returned successfully
                    println!("looper_func returned successfully, dropping lease");
                    if let Err(e) = Self::drop_lease(&pool, &lease_name, &worker_id).await {
                        eprintln!("Failed to drop lease for {}: {}", worker_id, e);
                    }
                    return Ok(());
                }
                Ok(Err(e)) => {
                    // looper_func returned an error
                    eprintln!("[ERR]: looper task failed: {:?}", e);
                    if let Err(drop_err) = Self::drop_lease(&pool, &lease_name, &worker_id).await {
                        eprintln!(
                            "Failed to drop lease after task failure for {}: {}",
                            worker_id, drop_err
                        );
                    }
                }
                Err(join_error) => {
                    // Task was cancelled or panicked
                    if join_error.is_cancelled() {
                        println!(
                            "looper task was cancelled (likely due to lost lease), continuing loop"
                        );
                        continue;
                    } else {
                        // Task panicked
                        eprintln!("[ERR]: looper task panicked: {:?}", join_error);
                        if let Err(drop_err) =
                            Self::drop_lease(&pool, &lease_name, &worker_id).await
                        {
                            eprintln!(
                                "Failed to drop lease after task panic for {}: {}",
                                worker_id, drop_err
                            );
                        }
                    }
                }
            }
        }
    }

    async fn wait_for_lease(
        pool: &Pool<Postgres>,
        lease_name: &str,
        worker_id: &str,
        options: &LeaseLooperOptions,
    ) -> Result<(), Box<dyn Error + Send + Sync>> {
        println!("attempting to acquire lease: {}", worker_id);

        let jitter_millis = if options.loop_interval_jitter.as_millis() > 0 {
            rand::rng().random_range(0..options.loop_interval_jitter.as_millis()) as u64
        } else {
            0
        };
        let sleep_duration = options.loop_interval + Duration::from_millis(jitter_millis);

        loop {
            match Self::try_acquire_lease(pool, lease_name, worker_id, options).await {
                Ok(true) => {
                    println!("successfully acquired lease: {}", worker_id);
                    break;
                }
                Ok(false) => {}
                Err(e) => {
                    eprintln!("Error trying to acquire lease for {}: {}", worker_id, e);
                    return Err(e);
                }
            }

            tokio::time::sleep(sleep_duration).await;
        }

        Ok(())
    }

    async fn try_acquire_lease(
        pool: &Pool<Postgres>,
        lease_name: &str,
        worker_id: &str,
        options: &LeaseLooperOptions,
    ) -> Result<bool, Box<dyn Error + Send + Sync>> {
        let result: (String, DateTime<Utc>) = sqlx::query_as(
            r#"
            INSERT INTO _pg_lease (name, worker_id, held_until)
            VALUES ($1, $2, NOW() + $3::interval)
            ON CONFLICT (name) DO UPDATE SET
                worker_id = CASE
                    WHEN _pg_lease.held_until < NOW() THEN $2
                    ELSE _pg_lease.worker_id
                END,
                held_until = CASE
                    WHEN _pg_lease.held_until < NOW() THEN NOW() + $3::interval
                    ELSE _pg_lease.held_until
                END
            RETURNING worker_id, held_until"#,
        )
        .bind(lease_name)
        .bind(worker_id)
        .bind(options.lease_duration)
        .fetch_one(pool)
        .await?;

        if result.0 == worker_id {
            println!("successfully acquired lease: {}", worker_id);
            Ok(true)
        } else {
            println!("lease held by another worker: {}", result.0);
            Ok(false)
        }
    }

    async fn heartbeat_loop(
        abort_handle: AbortHandle,
        pool: Pool<Postgres>,
        lease_name: String,
        worker_id: String,
        options: LeaseLooperOptions,
    ) {
        loop {
            tokio::time::sleep(options.lease_heartbeat_interval).await;

            match sqlx::query_as::<_, (String, DateTime<Utc>)>(
                r#"
            UPDATE _pg_lease
            SET held_until = NOW() + $3::interval
            WHERE name = $1 AND worker_id = $2 AND held_until > NOW()
            RETURNING worker_id, held_until"#,
            )
            .bind(lease_name.as_str())
            .bind(worker_id.as_str())
            .bind(options.lease_duration)
            .fetch_one(&pool)
            .await
            {
                Ok(result) => {
                    if result.0 != worker_id {
                        println!("lost lease during heartbeat: {}", worker_id);
                        abort_handle.abort();
                        return;
                    }
                }
                Err(e) => {
                    eprintln!("Error during heartbeat for {}: {}", worker_id, e);
                    println!("lost lease during heartbeat due to error: {}", worker_id);
                    abort_handle.abort();
                    return;
                }
            }
        }
    }

    async fn drop_lease(
        pool: &Pool<Postgres>,
        lease_name: &str,
        worker_id: &str,
    ) -> Result<(), Box<dyn Error + Send + Sync>> {
        sqlx::query(
            r#"
            DELETE FROM _pg_lease WHERE name = $1 AND worker_id = $2"#,
        )
        .bind(lease_name)
        .bind(worker_id)
        .execute(pool)
        .await?;

        println!("successfully dropped lease: {}", worker_id);
        Ok(())
    }

    /// Stops the lease looper, aborting all running tasks.
    pub async fn stop(&self) -> Result<(), Box<dyn Error + Send + Sync>> {
        let mut abort_handle = self.abort_handle.lock().await;
        if let Some(abort_handle) = abort_handle.take() {
            abort_handle.abort();
        }

        Ok(())
    }
}

/// Transactionally verifies that a lease is held by the specified worker.
pub async fn verify_lease_held(
    tx: &mut sqlx::Transaction<'_, Postgres>,
    lease_name: &str,
    worker_id: &str,
) -> Result<bool, Box<dyn Error + Send + Sync>> {
    let result: (String, DateTime<Utc>) = sqlx::query_as(
        r#"
        SELECT worker_id, held_until FROM _pg_lease WHERE name = $1 AND worker_id = $2"#,
    )
    .bind(lease_name)
    .bind(worker_id)
    .fetch_one(&mut **tx)
    .await?;

    Ok(result.0 == worker_id)
}

/// Force revokes a lease by deleting it from the database regardless of who holds it.
pub async fn force_revoke_lease(
    pool: &Pool<Postgres>,
    lease_name: &str,
) -> Result<bool, Box<dyn Error + Send + Sync>> {
    let result = sqlx::query(r#"DELETE FROM _pg_lease WHERE name = $1"#)
        .bind(lease_name)
        .execute(pool)
        .await?;

    let revoked = result.rows_affected() > 0;
    if revoked {
        println!("Force revoked lease '{}'", lease_name);
    }

    Ok(revoked)
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::sync::Arc;
    use std::time::SystemTime;
    use tokio::sync::{Mutex as TokioMutex, mpsc};
    use tokio::time::{sleep, timeout};

    const TEST_DB_URL: &str = "postgres://postgres:postgres@localhost:5432/postgres";

    #[derive(Clone)]
    struct SharedState {
        counter: Arc<TokioMutex<i32>>,
        message: String,
    }

    #[tokio::test]
    async fn test_lease_heartbeat() -> Result<(), Box<dyn Error + Send + Sync>> {
        let pool = sqlx::PgPool::connect(TEST_DB_URL).await?;

        let lease_name = format!(
            "test-lease-heartbeat-{}",
            SystemTime::now()
                .duration_since(std::time::UNIX_EPOCH)
                .unwrap()
                .as_nanos()
        );
        let lease_duration = Duration::from_secs(2);
        let loop_interval = Duration::from_millis(500);

        let (lease_held_tx, mut lease_held_rx) = mpsc::channel::<bool>(1);
        let (lease_complete_tx, mut lease_complete_rx) = mpsc::channel::<bool>(1);

        let lease_held_tx = Arc::new(tokio::sync::Mutex::new(Some(lease_held_tx)));
        let lease_complete_tx = Arc::new(tokio::sync::Mutex::new(Some(lease_complete_tx)));

        // Create shared state that the looper function can access
        let shared_state = SharedState {
            counter: Arc::new(TokioMutex::new(0)),
            message: "Hello from shared state!".to_string(),
        };

        let looper_func = {
            let lease_held_tx = lease_held_tx.clone();
            let lease_complete_tx = lease_complete_tx.clone();
            let lease_name = lease_name.clone();
            move |_context: LeaseContext, state: SharedState| {
                let lease_held_tx = lease_held_tx.clone();
                let lease_complete_tx = lease_complete_tx.clone();
                let lease_name = lease_name.clone();
                async move {
                    println!("Worker acquired lease {}", lease_name);
                    println!("State message: {}", state.message);

                    // Increment the counter in shared state
                    {
                        let mut counter = state.counter.lock().await;
                        *counter += 1;
                        println!("Incremented counter to: {}", *counter);
                    }

                    if let Some(tx) = lease_held_tx.lock().await.take() {
                        let _ = tx.send(true).await;
                    }

                    // Keep the lease for 2x the lease duration to test heartbeating
                    sleep(2 * lease_duration).await;
                    println!("Worker held lease for 2x duration, returning");

                    if let Some(tx) = lease_complete_tx.lock().await.take() {
                        let _ = tx.send(true).await;
                    }

                    Ok(())
                }
            }
        };

        let options = LeaseLooperOptions {
            loop_interval,
            loop_interval_jitter: Duration::from_secs(0),
            lease_duration,
            lease_heartbeat_interval: Duration::from_millis(500), // Much shorter than lease duration
        };

        let looper = LeaseLooper::new(
            lease_name.clone(),
            looper_func,
            "heartbeat-worker".to_string(),
            pool.clone(),
            options,
            shared_state.clone(),
        );

        // Start the looper
        looper.start().await?;

        // Wait for lease to be acquired
        let lease_acquired = timeout(Duration::from_secs(5), lease_held_rx.recv()).await;
        match lease_acquired {
            Ok(Some(true)) => println!("Successfully acquired lease {}", lease_name),
            _ => panic!("Failed to acquire lease {} within 5 seconds", lease_name),
        }

        // Wait for the worker to complete its 4-second hold
        let lease_completed = timeout(Duration::from_secs(6), lease_complete_rx.recv()).await;
        match lease_completed {
            Ok(Some(true)) => {
                println!("Successfully held lease for 2x duration via heartbeating");

                // Verify that the shared state was accessed and modified
                let final_counter = *shared_state.counter.lock().await;
                println!("Final counter value: {}", final_counter);

                looper.stop().await?;

                if final_counter == 1 {
                    println!("✓ Shared state was correctly accessed and modified");
                    Ok(())
                } else {
                    panic!("Expected counter to be 1, but got {}", final_counter);
                }
            }
            _ => {
                looper.stop().await?;
                panic!("Worker did not complete 4-second hold - heartbeating may have failed");
            }
        }
    }

    #[tokio::test]
    async fn test_lease_drop_on_return() -> Result<(), Box<dyn Error + Send + Sync>> {
        let pool = sqlx::PgPool::connect(TEST_DB_URL).await?;

        let lease_name = format!(
            "test-lease-drop-{}",
            SystemTime::now()
                .duration_since(std::time::UNIX_EPOCH)
                .unwrap()
                .as_nanos()
        );
        let lease_duration = Duration::from_secs(5);
        let loop_interval = Duration::from_millis(200);

        let (worker1_got_tx, mut worker1_got_rx) = mpsc::channel::<String>(1);
        let (worker1_done_tx, mut worker1_done_rx) = mpsc::channel::<bool>(1);
        let (worker2_got_tx, mut worker2_got_rx) = mpsc::channel::<String>(1);

        // Worker 1: Returns after 1 second
        let worker1_func = {
            let worker1_got_tx = worker1_got_tx.clone();
            let worker1_done_tx = worker1_done_tx.clone();
            let lease_name = lease_name.clone();
            move |_, _| {
                let worker1_got_tx = worker1_got_tx.clone();
                let worker1_done_tx = worker1_done_tx.clone();
                let lease_name = lease_name.clone();
                async move {
                    println!("Worker-1 acquired lease {}", lease_name);
                    let _ = worker1_got_tx.send("worker-1".to_string()).await;

                    sleep(Duration::from_secs(1)).await;
                    println!("Worker-1 returning to drop lease {}", lease_name);
                    let _ = worker1_done_tx.send(true).await;

                    Ok(()) // Return to drop the lease
                }
            }
        };

        // Worker 2: Waits to get the lease
        let worker2_func = {
            let worker2_got_tx = worker2_got_tx.clone();
            let lease_name = lease_name.clone();
            move |_, _| {
                let worker2_got_tx = worker2_got_tx.clone();
                let lease_name = lease_name.clone();
                async move {
                    println!("Worker-2 acquired lease {}", lease_name);
                    let _ = worker2_got_tx.send("worker-2".to_string()).await;

                    // Hold until we get cancelled (when test ends)
                    loop {
                        sleep(Duration::from_secs(1)).await;
                    }
                }
            }
        };

        let options = LeaseLooperOptions {
            loop_interval,
            loop_interval_jitter: Duration::from_secs(0),
            lease_duration,
            lease_heartbeat_interval: Duration::from_secs(1),
        };

        let looper1 = LeaseLooper::new(
            lease_name.clone(),
            worker1_func,
            "worker-1".to_string(),
            pool.clone(),
            options,
            (),
        );

        let looper2 = LeaseLooper::new(
            lease_name.clone(),
            worker2_func,
            "worker-2".to_string(),
            pool.clone(),
            options,
            (),
        );

        // Start worker 1 first
        looper1.start().await?;

        // Wait for worker 1 to get the lease
        let worker1_acquired = timeout(Duration::from_secs(5), worker1_got_rx.recv()).await;
        match worker1_acquired {
            Ok(Some(worker_id)) => {
                if worker_id != "worker-1" {
                    panic!("Expected worker-1, got {}", worker_id);
                }
                println!("Worker-1 successfully acquired lease {}", lease_name);
            }
            _ => panic!(
                "Worker-1 failed to get lease {} within 5 seconds",
                lease_name
            ),
        }

        sleep(Duration::from_millis(100)).await;

        // Now start worker 2
        println!("Starting worker 2");
        looper2.start().await?;

        // Wait for worker 1 to finish - it should drop the lease naturally
        let worker1_finished = timeout(Duration::from_secs(3), worker1_done_rx.recv()).await;
        match worker1_finished {
            Ok(Some(true)) => {
                println!("Worker-1 finished and should have dropped lease naturally");
                // Give a moment for the lease to be dropped
                sleep(Duration::from_millis(200)).await;
            }
            _ => panic!("Worker-1 did not finish within 3 seconds"),
        }

        // Wait for worker 2 to get the lease after worker 1 returns
        let worker2_acquired = timeout(Duration::from_secs(5), worker2_got_rx.recv()).await;
        match worker2_acquired {
            Ok(Some(worker_id)) => {
                if worker_id != "worker-2" {
                    panic!("Expected worker-2, got {}", worker_id);
                }
                println!(
                    "Worker-2 successfully acquired lease {} after worker-1 dropped it",
                    lease_name
                );
                // Clean up both loopers
                looper1.stop().await?;
                looper2.stop().await?;
                Ok(())
            }
            _ => {
                // Clean up both loopers on failure
                looper1.stop().await?;
                looper2.stop().await?;
                panic!(
                    "Worker-2 failed to get lease {} within 5 seconds after worker-1 returned",
                    lease_name
                );
            }
        }
    }

    #[tokio::test]
    async fn test_verify_lease_held() -> Result<(), Box<dyn Error + Send + Sync>> {
        let pool = sqlx::PgPool::connect(TEST_DB_URL).await?;

        let lease_name = format!(
            "test-lease-verify-{}",
            SystemTime::now()
                .duration_since(std::time::UNIX_EPOCH)
                .unwrap()
                .as_nanos()
        );
        let lease_duration = Duration::from_secs(2);
        let loop_interval = Duration::from_millis(500);

        let (lease_held_tx, mut lease_held_rx) = mpsc::channel::<bool>(1);
        let (verify_result_tx, mut verify_result_rx) = mpsc::channel::<bool>(1);

        let looper_func = {
            let lease_held_tx = lease_held_tx.clone();
            let verify_result_tx = verify_result_tx.clone();
            let lease_name = lease_name.clone();
            let pool = pool.clone();
            move |_, _| {
                let lease_held_tx = lease_held_tx.clone();
                let verify_result_tx = verify_result_tx.clone();
                let lease_name = lease_name.clone();
                let pool = pool.clone();
                async move {
                    println!("Worker acquired lease {}", lease_name);
                    let _ = lease_held_tx.send(true).await;

                    // Start a transaction to verify the lease is held
                    let mut tx = pool.begin().await?;

                    let held = verify_lease_held(&mut tx, &lease_name, "verify-worker").await?;
                    tx.rollback().await?;

                    println!("VerifyLeaseHeld returned: {}", held);
                    let _ = verify_result_tx.send(held).await;

                    // Hold the lease for a bit then return
                    sleep(Duration::from_millis(500)).await;
                    Ok(())
                }
            }
        };

        let options = LeaseLooperOptions {
            loop_interval,
            loop_interval_jitter: Duration::from_secs(0),
            lease_duration,
            lease_heartbeat_interval: Duration::from_millis(500),
        };

        let looper = LeaseLooper::new(
            lease_name.clone(),
            looper_func,
            "verify-worker".to_string(),
            pool.clone(),
            options,
            (),
        );

        // Start the looper
        looper.start().await?;

        // Wait for lease to be acquired
        let lease_acquired = timeout(Duration::from_secs(5), lease_held_rx.recv()).await;
        match lease_acquired {
            Ok(Some(true)) => println!("Successfully acquired lease {}", lease_name),
            _ => panic!("Failed to acquire lease {} within 5 seconds", lease_name),
        }

        // Wait for verification result
        let verify_result = timeout(Duration::from_secs(2), verify_result_rx.recv()).await;
        match verify_result {
            Ok(Some(held)) => {
                if !held {
                    looper.stop().await?;
                    panic!("Expected VerifyLeaseHeld to return true when lease is held, got false");
                } else {
                    println!("VerifyLeaseHeld correctly returned true for held lease");
                    looper.stop().await?;
                    Ok(())
                }
            }
            _ => {
                looper.stop().await?;
                panic!("VerifyLeaseHeld did not return result within 2 seconds");
            }
        }
    }

    #[tokio::test]
    async fn test_lease_heartbeat_failure() -> Result<(), Box<dyn Error + Send + Sync>> {
        let pool = sqlx::PgPool::connect(TEST_DB_URL).await?;

        let lease_name = format!(
            "test-lease-heartbeat-fail-{}",
            SystemTime::now()
                .duration_since(std::time::UNIX_EPOCH)
                .unwrap()
                .as_nanos()
        );
        let lease_duration = Duration::from_millis(800); // Short lease duration
        let loop_interval = Duration::from_millis(100);

        let (worker1_got_tx, mut worker1_got_rx) = mpsc::channel::<bool>(1);
        let (worker2_got_tx, mut worker2_got_rx) = mpsc::channel::<bool>(1);

        // Worker 1: Has broken heartbeat (too long interval)
        let worker1_func = {
            let worker1_got_tx = worker1_got_tx.clone();
            let lease_name = lease_name.clone();
            move |_, _| {
                let worker1_got_tx = worker1_got_tx.clone();
                let lease_name = lease_name.clone();
                async move {
                    println!("Worker-1 acquired lease {}", lease_name);
                    let _ = worker1_got_tx.send(true).await;

                    // This will run until cancelled due to lost lease from broken heartbeat
                    loop {
                        sleep(Duration::from_millis(100)).await;
                    }
                }
            }
        };

        // Worker 2: Has proper heartbeat to steal the lease
        let worker2_func = {
            let worker2_got_tx = worker2_got_tx.clone();
            let lease_name = lease_name.clone();
            move |_, _| {
                let worker2_got_tx = worker2_got_tx.clone();
                let lease_name = lease_name.clone();
                async move {
                    println!("Worker-2 acquired lease {}", lease_name);
                    let _ = worker2_got_tx.send(true).await;

                    // Hold until we get cancelled (when test ends)
                    loop {
                        sleep(Duration::from_secs(1)).await;
                    }
                }
            }
        };

        // Worker 1 options with broken heartbeat
        let worker1_options = LeaseLooperOptions {
            loop_interval,
            loop_interval_jitter: Duration::from_secs(0),
            lease_duration,
            lease_heartbeat_interval: Duration::from_secs(2), // Much longer than lease duration - should fail
        };

        // Worker 2 options with proper heartbeat
        let worker2_options = LeaseLooperOptions {
            loop_interval,
            loop_interval_jitter: Duration::from_secs(0),
            lease_duration,
            lease_heartbeat_interval: Duration::from_millis(200), // Proper heartbeat interval
        };

        let looper1 = LeaseLooper::new(
            lease_name.clone(),
            worker1_func,
            "worker-1".to_string(),
            pool.clone(),
            worker1_options,
            (),
        );

        let looper2 = LeaseLooper::new(
            lease_name.clone(),
            worker2_func,
            "worker-2".to_string(),
            pool.clone(),
            worker2_options,
            (),
        );

        // Start worker 1 first
        looper1.start().await?;

        // Wait for worker 1 to get the lease
        let worker1_acquired = timeout(Duration::from_secs(5), worker1_got_rx.recv()).await;
        match worker1_acquired {
            Ok(Some(true)) => println!("Worker-1 acquired lease"),
            _ => panic!("Worker-1 failed to acquire lease within 5 seconds"),
        }

        // Start worker 2 which should steal the lease
        looper2.start().await?;

        // Wait for worker 2 to steal the lease
        let worker2_acquired = timeout(Duration::from_secs(3), worker2_got_rx.recv()).await;
        match worker2_acquired {
            Ok(Some(true)) => println!("Worker-2 stole the lease"),
            _ => {
                looper1.stop().await?;
                looper2.stop().await?;
                panic!("Worker-2 should have stolen the lease within 3 seconds");
            }
        }

        // Give a moment for worker1 to detect it lost the lease and get cancelled
        sleep(Duration::from_millis(500)).await;

        // Stop both workers
        looper1.stop().await?;
        looper2.stop().await?;

        println!(
            "Test passed: Worker-2 successfully stole lease from Worker-1 with broken heartbeat"
        );
        Ok(())
    }

    #[tokio::test]
    async fn test_force_revoke_lease() -> Result<(), Box<dyn Error + Send + Sync>> {
        let pool = sqlx::PgPool::connect(TEST_DB_URL).await?;

        let lease_name = format!(
            "test-force-revoke-{}",
            SystemTime::now()
                .duration_since(std::time::UNIX_EPOCH)
                .unwrap()
                .as_nanos()
        );
        let lease_duration = Duration::from_secs(5);
        let loop_interval = Duration::from_millis(200);

        let (lease_held_tx, mut lease_held_rx) = mpsc::channel::<bool>(1);

        let looper_func = {
            let lease_held_tx = lease_held_tx.clone();
            let lease_name = lease_name.clone();
            move |_, _| {
                let lease_held_tx = lease_held_tx.clone();
                let lease_name = lease_name.clone();
                async move {
                    println!("Worker acquired lease {}", lease_name);
                    let _ = lease_held_tx.send(true).await;

                    // Hold the lease until cancelled (by force revoke)
                    loop {
                        sleep(Duration::from_millis(100)).await;
                    }
                }
            }
        };

        let options = LeaseLooperOptions {
            loop_interval,
            loop_interval_jitter: Duration::from_secs(0),
            lease_duration,
            lease_heartbeat_interval: Duration::from_millis(500),
        };

        let looper = LeaseLooper::new(
            lease_name.clone(),
            looper_func,
            "force-revoke-worker".to_string(),
            pool.clone(),
            options,
            (),
        );

        // Start the looper
        looper.start().await?;

        // Wait for lease to be acquired
        let lease_acquired = timeout(Duration::from_secs(5), lease_held_rx.recv()).await;
        match lease_acquired {
            Ok(Some(true)) => println!("Successfully acquired lease {}", lease_name),
            _ => {
                looper.stop().await?;
                panic!("Failed to acquire lease {} within 5 seconds", lease_name);
            }
        }

        // Give the worker a moment to establish the lease
        sleep(Duration::from_millis(100)).await;

        // Force revoke the lease
        let revoked = force_revoke_lease(&pool, &lease_name).await?;
        if !revoked {
            looper.stop().await?;
            panic!("Expected force_revoke_lease to return true, got false");
        }
        println!("Successfully force-revoked lease {}", lease_name);

        // Try to revoke the same lease again - should return false since it doesn't exist
        let revoked_again = force_revoke_lease(&pool, &lease_name).await?;
        if revoked_again {
            looper.stop().await?;
            panic!(
                "Expected second force_revoke_lease to return false for non-existent lease, got true"
            );
        }
        println!("Correctly returned false when trying to revoke non-existent lease");

        // Try to revoke a completely non-existent lease
        let fake_lease_name = format!("{}-nonexistent", lease_name);
        let revoked_fake = force_revoke_lease(&pool, &fake_lease_name).await?;
        if revoked_fake {
            looper.stop().await?;
            panic!("Expected force_revoke_lease on fake lease to return false, got true");
        }
        println!("Correctly returned false when trying to revoke completely fake lease");

        looper.stop().await?;
        Ok(())
    }
}