coalesced_map 0.1.3

A thread-safe, deduplicating map that ensures expensive computations are executed only once per key
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
#![deny(missing_docs)]

//! A thread-safe, deduplicating map that ensures expensive computations are
//! executed only once per key, even when multiple concurrent requests are made.
//!
//! This map is designed for scenarios where multiple async tasks might request
//! the same resource simultaneously. Instead of performing duplicate work, the
//! `CoalescedMap` ensures that only the first request for a given key executes
//! the initialization function, while subsequent concurrent requests wait for
//! and receive the same result.
//!
//! The implementation uses `DashMap` for thread-safe storage and
//! `tokio::sync::broadcast` channels for coordinating between concurrent
//! waiters.
//!
//! ## Example
//!
//! ```rust,ignore
//! use coalesced_map::{CoalescedMap, CoalescedGetError};
//! use std::{sync::Arc, time::Duration};
//!
//! #[tokio::main]
//! async fn main() {
//!     // Create with default hasher (RandomState)
//!     let cache: CoalescedMap<String, Arc<String>> = CoalescedMap::new();
//!
//!     // Or create with custom hasher
//!     use std::collections::hash_map::RandomState;
//!     let hasher = RandomState::new();
//!     let cache_with_hasher: CoalescedMap<String, Arc<String>, RandomState> =
//!         CoalescedMap::with_hasher(hasher);
//!
//!     // Simulate multiple concurrent requests for the same expensive resource
//!     let key = "expensive_computation".to_string();
//!
//!     let handle1 = {
//!         let cache = cache.clone();
//!         let key = key.clone();
//!         tokio::spawn(async move {
//!             cache.get_or_try_init(key, || async {
//!                 // Simulate expensive work (e.g., network request, file I/O)
//!                 tokio::time::sleep(Duration::from_millis(100)).await;
//!                 println!("Performing expensive computation...");
//!                 Ok(Arc::new("computed_result".to_string()))
//!             }).await
//!         })
//!     };
//!
//!     let handle2 = {
//!         let cache = cache.clone();
//!         let key = key.clone();
//!         tokio::spawn(async move {
//!             cache.get_or_try_init(key, || async {
//!                 // This function will NOT be executed due to coalescing
//!                 println!("This should not print!");
//!                 Ok(Arc::new("unused".to_string()))
//!             }).await
//!         })
//!     };
//!
//!     let result1 = handle1.await.unwrap().unwrap();
//!     let result2 = handle2.await.unwrap().unwrap();
//!
//!     // Both results are identical (same Arc instance)
//!     assert!(Arc::ptr_eq(&result1, &result2));
//!     assert_eq!(*result1, "computed_result");
//! }
//! ```

use std::{
    fmt,
    hash::{BuildHasher, Hash, RandomState},
    sync::{Arc, Weak},
};

use dashmap::{DashMap, mapref::entry::Entry};
use tokio::sync::broadcast;

/// Error type returned by [`CoalescedMap::get_or_try_init`].
///
/// When multiple tasks race to initialize the same key, only the winner runs
/// the provided initializer. Other tasks subscribe to a broadcast channel. If
/// the initializer returns an error, the winner returns `Init(err)` and the
/// channel sender is dropped, causing subscribers to receive
/// `CoalescedRequestFailed`.
#[derive(Debug)]
pub enum CoalescedGetError<E> {
    /// The initialization future returned an error.
    Init(E),
    /// The in-flight coalesced request failed before publishing a value.
    CoalescedRequestFailed,
}

impl<E: fmt::Display> fmt::Display for CoalescedGetError<E> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            CoalescedGetError::Init(e) => write!(f, "initializer failed: {e}"),
            CoalescedGetError::CoalescedRequestFailed => {
                write!(f, "a coalesced request failed")
            }
        }
    }
}

impl<E: fmt::Debug + fmt::Display> std::error::Error for CoalescedGetError<E> {}

/// A thread-safe map that deduplicates concurrent async initialization
/// requests.
///
/// When multiple tasks concurrently request the same key, only the first task
/// executes the initialization function. Other tasks automatically wait for the
/// result via a broadcast channel. This prevents duplicate work and ensures
/// consistent results across all waiters.
///
/// Internally, each entry can be in one of two states:
/// - **Pending**: An initialization is in progress, tracked by a broadcast
///   sender
/// - **Fetched**: The value has been computed and cached for immediate
///   retrieval
#[derive(Clone)]
pub struct CoalescedMap<K, V, S = RandomState>
where
    K: Eq + Hash,
    V: Clone,
    S: BuildHasher + Clone,
{
    map: DashMap<K, PendingOrFetched<V>, S>,
}

impl<K, V> Default for CoalescedMap<K, V, RandomState>
where
    K: Eq + Hash,
    V: Clone,
{
    fn default() -> Self {
        Self::new()
    }
}

impl<K, V> CoalescedMap<K, V, RandomState>
where
    K: Eq + Hash,
    V: Clone,
{
    /// Creates an empty `CoalescedMap` with the default hasher.
    pub fn new() -> Self {
        Self {
            map: DashMap::new(),
        }
    }

    /// Creates a new `CoalescedMap` with a specified capacity.
    pub fn with_capacity(capacity: usize) -> Self {
        Self {
            map: DashMap::with_capacity(capacity),
        }
    }
}

impl<K, V, S> CoalescedMap<K, V, S>
where
    K: Eq + Hash,
    V: Clone,
    S: BuildHasher + Clone,
{
    /// Creates an empty `CoalescedMap` with the given hasher.
    pub fn with_hasher(hasher: S) -> Self {
        Self {
            map: DashMap::with_hasher(hasher),
        }
    }

    /// Creates a new `CoalescedMap` with a specified capacity and hasher.
    pub fn with_capacity_and_hasher(capacity: usize, hasher: S) -> Self {
        Self {
            map: DashMap::with_capacity_and_hasher(capacity, hasher),
        }
    }

    /// Returns the number of entries currently stored (including pending).
    pub fn len(&self) -> usize {
        self.map.len()
    }

    /// Returns true if the map contains no entries.
    pub fn is_empty(&self) -> bool {
        self.map.is_empty()
    }
}

impl<K, V, S> CoalescedMap<K, V, S>
where
    K: Eq + Hash + Clone,
    V: Clone + Send + Sync + 'static,
    S: BuildHasher + Clone,
{
    /// Returns the value for `key`, initializing it at most once using the
    /// provided async `init` function. Concurrent calls for the same key are
    /// coalesced: only the first executes `init`; others await the result.
    ///
    /// On successful initialization, the value is inserted and shared with all
    /// waiters. If the initializer returns an error, the error is returned to
    /// the caller that executed it, while other waiters receive
    /// `CoalescedRequestFailed`.
    pub async fn get_or_try_init<E, Fut, F>(
        &self,
        key: K,
        init: F,
    ) -> Result<V, CoalescedGetError<E>>
    where
        F: FnOnce() -> Fut,
        Fut: std::future::Future<Output = Result<V, E>>,
    {
        // Attempt to occupy or observe the entry.
        let sender = match self.map.entry(key.clone()) {
            Entry::Vacant(entry) => {
                // First caller: create a broadcast sender and mark as pending.
                let (tx, _) = broadcast::channel(1);
                let tx = Arc::new(tx);
                entry.insert(PendingOrFetched::Pending(Arc::downgrade(&tx)));
                tx
            }
            Entry::Occupied(mut entry) => match entry.get() {
                PendingOrFetched::Fetched(v) => return Ok(v.clone()),
                PendingOrFetched::Pending(weak_tx) => {
                    if let Some(tx) = weak_tx.upgrade() {
                        // Subscribe before dropping the entry to avoid missing the send.
                        let mut rx = tx.subscribe();

                        // We only care about the receiver, drop the sender immediately.
                        drop(tx);

                        // Drop the entry to allow others to query the map
                        drop(entry);

                        return rx
                            .recv()
                            .await
                            .map_err(|_err| CoalescedGetError::CoalescedRequestFailed);
                    }

                    // Previous sender dropped without publishing; become the new initializer.
                    let (tx, _) = broadcast::channel(1);
                    let tx = Arc::new(tx);
                    entry.insert(PendingOrFetched::Pending(Arc::downgrade(&tx)));
                    tx
                }
            },
        };

        // We are the initializer for this key. Run the init future.
        match init().await {
            Ok(value) => {
                // Store the value and notify any waiters.
                self.map
                    .insert(key, PendingOrFetched::Fetched(value.clone()));
                let _ = sender.send(value.clone());
                Ok(value)
            }
            Err(err) => Err(CoalescedGetError::Init(err)),
        }
    }

    /// Attempts to get a previously fetched value without triggering
    /// initialization. Returns `Some` if the entry is present and fetched.
    pub fn get(&self, key: &K) -> Option<V> {
        self.map.get(key).and_then(|g| match g.value() {
            PendingOrFetched::Fetched(v) => Some(v.clone()),
            PendingOrFetched::Pending(_) => None,
        })
    }

    /// Clears all entries matching the predicate, similar to `DashMap::retain`.
    pub fn retain<F>(&self, mut f: F)
    where
        F: FnMut(&K, &PendingOrFetched<V>) -> bool,
    {
        self.map.retain(|k, v| f(k, v));
    }

    /// Clears all non-pending cached values, preserving only in-flight entries.
    ///
    /// This removes entries whose state is `Fetched(..)` while keeping entries
    /// that are currently `Pending(..)` so ongoing initializations are not
    /// disrupted.
    pub fn clear(&self) {
        self.map
            .retain(|_, v| matches!(v, PendingOrFetched::Pending(_)));
    }
}

/// Internal state for a coalesced entry.
#[derive(Clone)]
pub enum PendingOrFetched<T> {
    /// There is an in-flight initialization; waiters subscribe to the sender.
    Pending(Weak<broadcast::Sender<T>>),
    /// The value has been initialized.
    Fetched(T),
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::{
        future::pending,
        sync::{
            Arc,
            atomic::{AtomicUsize, Ordering},
        },
    };
    use tokio::task::JoinHandle;

    #[tokio::test]
    async fn test_basic_get_or_try_init() {
        let map: CoalescedMap<String, String> = CoalescedMap::new();

        let result = map
            .get_or_try_init("key1".to_string(), || async {
                Ok::<_, &str>("value1".to_string())
            })
            .await
            .unwrap();

        assert_eq!(result, "value1");

        // Second call should return cached value without calling init function
        let result2 = map
            .get_or_try_init("key1".to_string(), || async {
                Ok::<_, &str>("should_not_be_called".to_string())
            })
            .await
            .unwrap();

        assert_eq!(result2, "value1");
    }

    #[tokio::test]
    async fn test_get_if_fetched() {
        let map: CoalescedMap<String, String> = CoalescedMap::new();

        // Should return None for non-existent key
        assert_eq!(map.get(&"key1".to_string()), None);

        // Initialize a value
        map.get_or_try_init("key1".to_string(), || async {
            Ok::<_, &str>("value1".to_string())
        })
        .await
        .unwrap();

        // Should return Some for initialized key
        assert_eq!(map.get(&"key1".to_string()), Some("value1".to_string()));
    }

    #[tokio::test]
    async fn test_concurrent_initialization() {
        let map: Arc<CoalescedMap<String, Arc<String>>> = Arc::new(CoalescedMap::new());
        let call_count = Arc::new(AtomicUsize::new(0));
        let barrier = Arc::new(tokio::sync::Barrier::new(10));

        // Start multiple concurrent tasks that try to initialize the same key
        let handles: Vec<_> = (0..10)
            .map(|i| {
                let map = map.clone();
                let call_count = call_count.clone();
                let barrier = barrier.clone();
                tokio::spawn(async move {
                    // Wait for all tasks to be ready before starting
                    barrier.wait().await;

                    map.get_or_try_init("shared_key".to_string(), || {
                        let call_count = call_count.clone();
                        async move {
                            // Track how many times the init function is called
                            call_count.fetch_add(1, Ordering::SeqCst);

                            Ok::<_, &str>(Arc::new(format!("value_from_task_{i}")))
                        }
                    })
                    .await
                })
            })
            .collect();

        // Wait for all tasks to complete
        let results: Vec<_> = futures::future::try_join_all(handles)
            .await
            .unwrap()
            .into_iter()
            .map(|r| r.unwrap())
            .collect();

        // The initialization function should only be called once
        assert_eq!(call_count.load(Ordering::SeqCst), 1);

        // All results should be identical (same Arc instance)
        let first_result = &results[0];
        for result in &results {
            assert!(Arc::ptr_eq(first_result, result));
        }
    }

    #[tokio::test]
    async fn test_error_handling() {
        let map: CoalescedMap<String, String> = CoalescedMap::new();

        // Test that errors are properly propagated
        let result = map
            .get_or_try_init("error_key".to_string(), || async {
                Err("initialization failed")
            })
            .await;

        match result {
            Err(CoalescedGetError::Init(err)) => assert_eq!(err, "initialization failed"),
            _ => panic!("Expected Init error"),
        }

        // Key should not be cached after error
        assert_eq!(map.get(&"error_key".to_string()), None);

        // Subsequent call should retry initialization
        let success_result = map
            .get_or_try_init("error_key".to_string(), || async {
                Ok::<_, &str>("success_value".to_string())
            })
            .await
            .unwrap();

        assert_eq!(success_result, "success_value");
    }

    #[tokio::test]
    async fn test_concurrent_error_handling() {
        let map = Arc::new(CoalescedMap::new());
        let init_calls = Arc::new(AtomicUsize::new(0));
        let barrier = Arc::new(tokio::sync::Barrier::new(2));

        // Start multiple concurrent tasks, one will fail
        let handles: Vec<_> = (0..5)
            .map(|i| {
                let map = map.clone();
                let init_calls = init_calls.clone();
                let barrier = barrier.clone();
                tokio::spawn(async move {
                    map.get_or_try_init("fail_key".to_string(), || {
                        let init_calls = init_calls.clone();
                        async move {
                            init_calls.fetch_add(1, Ordering::SeqCst);

                            if i == 0 {
                                // First task succeeds after a delay
                                barrier.wait().await;
                                Ok(format!("success_{i}"))
                            } else {
                                // Other tasks would fail, but they should be coalesced
                                // and receive the successful result
                                Err(format!("error_{i}"))
                            }
                        }
                    })
                    .await
                })
            })
            .collect();

        // Wait for all tasks to reach the barrier.
        barrier.wait().await;

        // Wait for all tasks to complete.
        let results: Vec<_> = futures::future::join_all(handles).await;

        // Since
        assert_eq!(init_calls.load(Ordering::SeqCst), 1);

        // All tasks should get the same successful result
        for result in results {
            let value = result.unwrap().unwrap();
            assert_eq!(value, "success_0");
        }
    }

    #[tokio::test]
    async fn test_different_keys() {
        let map = Arc::new(CoalescedMap::new());

        // Initialize different keys concurrently
        let handles: Vec<_> = (0..5)
            .map(|i| {
                let map = map.clone();
                tokio::spawn(async move {
                    let key = format!("key_{i}");
                    let value = format!("value_{i}");
                    map.get_or_try_init(key.clone(), || async move { Ok::<_, &str>(value) })
                        .await
                        .map(|v| (key, v))
                })
            })
            .collect();

        let results: Vec<_> = futures::future::try_join_all(handles)
            .await
            .unwrap()
            .into_iter()
            .map(|r| r.unwrap())
            .collect();

        // Each key should have its own value
        for (i, (key, value)) in results.into_iter().enumerate() {
            assert_eq!(key, format!("key_{i}"));
            assert_eq!(value, format!("value_{i}"));
        }

        // Verify all keys are cached
        for i in 0..5 {
            let key = format!("key_{i}");
            let expected_value = format!("value_{i}");
            assert_eq!(map.get(&key), Some(expected_value));
        }
    }

    #[tokio::test]
    async fn test_retain_functionality() {
        let map: CoalescedMap<String, String> = CoalescedMap::new();

        // Initialize several keys
        for i in 0..5 {
            let key = format!("key_{i}");
            let value = format!("value_{i}");
            map.get_or_try_init(key, || async move { Ok::<_, &str>(value) })
                .await
                .unwrap();
        }

        // Retain only keys with even numbers
        map.retain(|key, _| {
            if let Some(num_str) = key.strip_prefix("key_")
                && let Ok(num) = num_str.parse::<i32>()
            {
                return num % 2 == 0;
            }
            false
        });

        // Check that only even keys remain
        assert_eq!(map.get(&"key_0".to_string()), Some("value_0".to_string()));
        assert_eq!(map.get(&"key_1".to_string()), None);
        assert_eq!(map.get(&"key_2".to_string()), Some("value_2".to_string()));
        assert_eq!(map.get(&"key_3".to_string()), None);
        assert_eq!(map.get(&"key_4".to_string()), Some("value_4".to_string()));
    }

    #[tokio::test]
    async fn test_coalesced_request_failed_error() {
        let map = Arc::new(CoalescedMap::new());
        // Use a 3-party barrier so the test task can coordinate
        // that both spawned tasks reached the rendezvous before aborting.
        let barrier = Arc::new(tokio::sync::Barrier::new(3));

        let map1 = map.clone();
        let barrier1 = barrier.clone();
        let handle1 = tokio::spawn(async move {
            map1.get_or_try_init("test_key".to_string(), || async move {
                barrier1.wait().await;
                // Simulate a task that gets cancelled/dropped
                // Sleep forever to ensure it doesn't complete
                // before we abort it from the test thread.
                let () = pending().await;
                Ok::<_, &str>("value".to_string())
            })
            .await
        });

        let map2 = map.clone();
        let barrier2 = barrier.clone();
        let handle2 = tokio::spawn(async move {
            // Wait a bit to ensure the first task starts first
            barrier2.wait().await;

            // This should subscribe to the first task's broadcast
            map2.get_or_try_init("test_key".to_string(), || async move {
                Ok::<_, &str>("should_not_be_called".to_string())
            })
            .await
        });

        // Wait till both tasks reach the barrier
        barrier.wait().await;

        // Cancel the first task to simulate a coalesced request failure
        handle1.abort();

        // The second task should receive a CoalescedRequestFailed error
        let result = handle2.await.unwrap();
        match result {
            Err(CoalescedGetError::CoalescedRequestFailed) => {
                // This is expected
            }
            _ => panic!("Expected CoalescedRequestFailed error, got {result:?}"),
        }
    }

    #[tokio::test]
    async fn test_coalesced_request_failed_panic() {
        let map = Arc::new(CoalescedMap::new());
        // Use a 3-party barrier so the test task can coordinate
        // that both spawned tasks reached the rendezvous before aborting.
        let barrier = Arc::new(tokio::sync::Barrier::new(3));

        let map1 = map.clone();
        let barrier1 = barrier.clone();
        let handle1: JoinHandle<Result<String, CoalescedGetError<&'static str>>> =
            tokio::spawn(async move {
                map1.get_or_try_init("test_key".to_string(), || async move {
                    barrier1.wait().await;
                    panic!();
                })
                .await
            });

        let map2 = map.clone();
        let barrier2 = barrier.clone();
        let handle2 = tokio::spawn(async move {
            // Wait a bit to ensure the first task starts first
            barrier2.wait().await;

            // This should subscribe to the first task's broadcast
            map2.get_or_try_init("test_key".to_string(), || async move {
                Ok::<_, &str>("should_not_be_called".to_string())
            })
            .await
        });

        // Wait till both tasks reach the barrier
        barrier.wait().await;

        // Cancel the first task to simulate a coalesced request failure
        handle1.abort();

        // The second task should receive a CoalescedRequestFailed error
        let result = handle2.await.unwrap();
        match result {
            Err(CoalescedGetError::CoalescedRequestFailed) => {
                // This is expected
            }
            _ => panic!("Expected CoalescedRequestFailed error, got {result:?}"),
        }
    }

    #[tokio::test]
    async fn test_empty_map() {
        let map: CoalescedMap<String, String> = CoalescedMap::new();

        assert_eq!(map.len(), 0);
        assert!(map.is_empty());
        assert_eq!(map.get(&"nonexistent".to_string()), None);

        // Add one item
        map.get_or_try_init("key".to_string(), || async {
            Ok::<_, &str>("value".to_string())
        })
        .await
        .unwrap();

        assert_eq!(map.len(), 1);
        assert!(!map.is_empty());
    }

    #[tokio::test]
    async fn test_custom_hasher() {
        use std::collections::hash_map::RandomState;

        // Test with custom RandomState hasher
        let hasher = RandomState::new();
        let map: CoalescedMap<String, String, RandomState> = CoalescedMap::with_hasher(hasher);

        let result = map
            .get_or_try_init("key1".to_string(), || async {
                Ok::<_, &str>("value1".to_string())
            })
            .await
            .unwrap();

        assert_eq!(result, "value1");

        // Test with capacity and hasher
        let hasher2 = RandomState::new();
        let map2: CoalescedMap<String, String, RandomState> =
            CoalescedMap::with_capacity_and_hasher(10, hasher2);

        let result2 = map2
            .get_or_try_init("key2".to_string(), || async {
                Ok::<_, &str>("value2".to_string())
            })
            .await
            .unwrap();

        assert_eq!(result2, "value2");
    }

    #[tokio::test]
    async fn test_clear_removes_fetched_keeps_pending() {
        let map = Arc::new(CoalescedMap::new());

        // Create a pending entry by starting an initialization that never completes.
        let barrier = Arc::new(tokio::sync::Barrier::new(2));
        let map1 = map.clone();
        let barrier1 = barrier.clone();
        let pending_handle = tokio::spawn(async move {
            map1.get_or_try_init("pending".to_string(), || {
                let barrier = barrier1.clone();
                async move {
                    // Signal the test that the init started and entry is pending.
                    barrier.wait().await;
                    // Never complete; simulate long-running in-flight work.
                    let () = pending().await;
                    #[allow(unreachable_code)]
                    Ok::<_, &str>("never".to_string())
                }
            })
            .await
        });

        // Wait for the initializer to start and set the entry to Pending.
        barrier.wait().await;

        // Insert a fetched entry.
        map.get_or_try_init("fetched".to_string(), || async {
            Ok::<_, &str>("value".to_string())
        })
        .await
        .unwrap();

        // Sanity: we have both entries.
        assert_eq!(map.len(), 2);
        assert_eq!(map.get(&"fetched".to_string()), Some("value".to_string()));
        assert_eq!(map.get(&"pending".to_string()), None);

        // Clear should remove fetched entries but keep pending ones.
        map.clear();

        assert_eq!(map.len(), 1, "only the pending entry should remain");
        assert_eq!(map.get(&"fetched".to_string()), None);
        assert_eq!(map.get(&"pending".to_string()), None);

        // Clean up the spawned pending task to avoid leaking.
        pending_handle.abort();
        let _ = pending_handle.await;
    }
}