announcement/
lib.rs

1//! A runtime-agnostic oneshot broadcast channel for Rust.
2//!
3//! Copyright (c) 2025 Stephen Waits <steve@waits.net>
4//!
5//! This crate provides a simple way to broadcast a value once to multiple listeners.
6//! It's built on `Arc<OnceLock>` and `event-listener` for efficiency and runtime agnosticism.
7//!
8//! # Features
9//!
10//! - **Simple API** - Create, announce, listen
11//! - **Fast** - Built on `Arc<OnceLock>` and `event-listener`
12//! - **Runtime-agnostic** - Works with tokio, async-std, smol, or any executor
13//! - **Type-safe** - Can only announce once, listeners are cloneable
14//! - **Lightweight** - Minimal dependencies
15//! - **Thread-safe** - All types are Send + Sync when T: Send + Sync
16//! - **Cancel-safe** - Async operations can be safely cancelled
17//!
18//! # Feature Flags
19//!
20//! - `default` - Includes std feature
21//! - `std` - Enables std library support (required for blocking operations: `listen_blocking()`, `listen_timeout_blocking()`)
22//! - `tracing` - Enables tracing integration for debugging
23//!
24//! # Quick Start
25//!
26//! ```ignore
27//! use announcement::Announcement;
28//!
29//! #[tokio::main]
30//! async fn main() {
31//!     // Create announcement channel
32//!     let (announcer, announcement) = Announcement::new();
33//!
34//!     // Create multiple listeners
35//!     let listener1 = announcement.listener();
36//!     let listener2 = announcement.listener();
37//!
38//!     // Spawn tasks
39//!     tokio::spawn(async move {
40//!         let value = listener1.listen().await;
41//!         println!("Listener 1: {:?}", value);
42//!     });
43//!
44//!     tokio::spawn(async move {
45//!         let value = listener2.listen().await;
46//!         println!("Listener 2: {:?}", value);
47//!     });
48//!
49//!     // Announce to all listeners
50//!     announcer.announce(42).unwrap();
51//! }
52//! ```
53//!
54//! # Use Cases
55//!
56//! - **Shutdown signals** - Notify all tasks to shut down
57//! - **Configuration broadcast** - Share initialized config to all workers
58//! - **Lazy initialization** - Signal when a resource is ready
59//! - **Event fanout** - Broadcast a one-time event to multiple subscribers
60//!
61//! # Comparison to Alternatives
62//!
63//! Unlike `tokio::sync::broadcast`, `announcement` is:
64//! - Single-shot (announce once)
65//! - Runtime-agnostic
66//! - Simpler API for one-time broadcasts
67//!
68//! # Performance
69//!
70//! - Announcement creation: ~200ns
71//! - Listener creation: ~20ns
72//! - Announce operation: ~10-40ns
73//! - Listen (already announced): ~5-10ns
74//!
75//! For N listeners with `Arc<T>`:
76//! - Memory: O(1) - single allocation of T
77//! - Time: O(N) arc clones (~10ns each)
78//!
79//! # Thread Safety & Memory Model
80//!
81//! All types (`Announcement`, `Announcer`, `Listener`) are `Send + Sync` when `T: Send + Sync`.
82//!
83//! ## Memory Ordering Guarantees
84//!
85//! - **Announce → Listen**: The announce operation has a happens-before relationship with
86//!   all listen operations. Any writes before `announce()` are visible after `listen()` returns.
87//! - **Closed Flag**: Uses SeqCst ordering to ensure visibility of the closed state across
88//!   all threads, including on weakly-ordered architectures (ARM, RISC-V, PowerPC).
89//! - **Value Storage**: `OnceLock` provides its own synchronization guarantees.
90//!
91//! ## TOCTOU Protection
92//!
93//! All async listen operations use a double-check pattern to prevent Time-Of-Check-Time-Of-Use
94//! races between checking for a value and waiting for notification.
95//!
96//! # Cancel Safety
97//!
98//! The `listen()` method is cancel-safe: dropping the future will not lose the announced value.
99//! You can safely use it with `select!`, `timeout`, or any operation that might cancel the future.
100//! The value remains available for subsequent `listen()` or `try_listen()` calls.
101
102use std::sync::atomic::{AtomicBool, Ordering};
103use std::sync::{Arc, OnceLock};
104
105use event_listener::Event;
106
107#[cfg(feature = "std")]
108use event_listener::Listener as EventListenerTrait;
109
110#[cfg(feature = "std")]
111use std::time::{Duration, Instant};
112
113/// Internal shared state for the announcement channel.
114struct AnnouncementInner<T> {
115    /// The announced value (set exactly once).
116    value: OnceLock<T>,
117    /// Whether the announcer was closed without announcing.
118    closed: AtomicBool,
119}
120
121/// The main announcement channel type that creates listeners.
122///
123/// This is the primary interface for creating listeners that will receive
124/// the announced value. It can be cloned cheaply to share across threads.
125///
126/// # Examples
127///
128/// ```
129/// use announcement::Announcement;
130///
131/// let (announcer, announcement) = Announcement::<i32>::new();
132/// let listener1 = announcement.listener();
133/// let listener2 = announcement.listener();
134///
135/// announcer.announce(42).unwrap();
136///
137/// assert_eq!(listener1.try_listen(), Some(42));
138/// assert_eq!(listener2.try_listen(), Some(42));
139/// ```
140#[must_use = "Announcement should be used to create listeners"]
141#[derive(Clone)]
142pub struct Announcement<T> {
143    inner: Arc<AnnouncementInner<T>>,
144    event: Arc<Event>,
145}
146
147/// The sending end of the announcement channel.
148///
149/// This type is NOT `Clone` - it can only be used once to ensure
150/// that a value can only be announced once.
151///
152/// # Examples
153///
154/// ```
155/// use announcement::Announcement;
156///
157/// let (announcer, announcement) = Announcement::<String>::new();
158/// announcer.announce("Hello!".to_string()).unwrap();
159/// ```
160#[must_use = "Announcer must be used; call announce() or close()"]
161pub struct Announcer<T> {
162    inner: Arc<AnnouncementInner<T>>,
163    event: Arc<Event>,
164}
165
166/// A listener that waits for an announced value.
167///
168/// This type is `Clone` and can be shared across multiple tasks or threads.
169/// All clones will receive the same value when it's announced.
170///
171/// # Examples
172///
173/// ```
174/// use announcement::Announcement;
175///
176/// let (announcer, announcement) = Announcement::new();
177/// let listener = announcement.listener();
178///
179/// announcer.announce(42).unwrap();
180/// assert_eq!(listener.try_listen(), Some(42));
181/// ```
182#[must_use = "Listener must be used; call listen() or try_listen()"]
183#[derive(Clone)]
184pub struct Listener<T> {
185    inner: Arc<AnnouncementInner<T>>,
186    event: Arc<Event>,
187}
188
189/// Error returned when attempting to announce a value.
190///
191/// # Examples
192///
193/// ```
194/// use announcement::{Announcement, AnnounceError};
195///
196/// let (announcer, announcement) = Announcement::new();
197/// announcer.announce(42).unwrap();
198///
199/// // Cannot announce again - would need a second announcer which can't be created
200/// let err = AnnounceError::AlreadyAnnounced(99);
201/// assert_eq!(err.into_inner(), 99);
202/// ```
203#[derive(Debug, Clone, PartialEq, Eq)]
204pub enum AnnounceError<T> {
205    /// A value was already announced.
206    /// Contains the value you tried to send.
207    AlreadyAnnounced(T),
208}
209
210impl<T> AnnounceError<T> {
211    /// Extract the value from the error.
212    ///
213    /// # Examples
214    ///
215    /// ```
216    /// use announcement::AnnounceError;
217    ///
218    /// let err = AnnounceError::AlreadyAnnounced(42);
219    /// assert_eq!(err.into_inner(), 42);
220    /// ```
221    pub fn into_inner(self) -> T {
222        match self {
223            AnnounceError::AlreadyAnnounced(val) => val,
224        }
225    }
226}
227
228impl<T> std::fmt::Display for AnnounceError<T> {
229    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
230        match self {
231            AnnounceError::AlreadyAnnounced(_) => {
232                write!(f, "a value has already been announced")
233            }
234        }
235    }
236}
237
238impl<T: std::fmt::Debug> std::error::Error for AnnounceError<T> {}
239
240/// Error returned when listening for a value.
241///
242/// # Examples
243///
244/// ```
245/// use announcement::ListenError;
246///
247/// let err = ListenError::Timeout;
248/// assert_eq!(err.to_string(), "timeout elapsed while waiting for announcement");
249///
250/// let err = ListenError::Closed;
251/// assert_eq!(err.to_string(), "announcer was dropped without announcing");
252/// ```
253#[derive(Debug, Clone, Copy, PartialEq, Eq)]
254pub enum ListenError {
255    /// Timeout elapsed without receiving a value
256    Timeout,
257    /// Announcer was dropped without announcing
258    Closed,
259}
260
261impl std::fmt::Display for ListenError {
262    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
263        match self {
264            ListenError::Timeout => write!(f, "timeout elapsed while waiting for announcement"),
265            ListenError::Closed => write!(f, "announcer was dropped without announcing"),
266        }
267    }
268}
269
270impl std::error::Error for ListenError {}
271
272impl<T: Clone + Send + Sync + 'static> Announcement<T> {
273    /// Create a new announcement channel.
274    ///
275    /// Returns a tuple of `(Announcer, Announcement)`. The announcer is used to
276    /// broadcast the value once, and the announcement is used to create listeners.
277    ///
278    /// # Examples
279    ///
280    /// ```
281    /// use announcement::Announcement;
282    ///
283    /// let (announcer, announcement) = Announcement::<i32>::new();
284    /// ```
285    pub fn new() -> (Announcer<T>, Self) {
286        #[cfg(feature = "tracing")]
287        tracing::trace!("Creating new announcement channel");
288
289        let inner = Arc::new(AnnouncementInner {
290            value: OnceLock::new(),
291            closed: AtomicBool::new(false),
292        });
293        let event = Arc::new(Event::new());
294
295        let announcer = Announcer {
296            inner: Arc::clone(&inner),
297            event: Arc::clone(&event),
298        };
299
300        let announcement = Self { inner, event };
301
302        (announcer, announcement)
303    }
304
305    /// Create a new listener for this announcement.
306    ///
307    /// Can be called before or after announcing. Can be called multiple times
308    /// to create multiple independent listeners that will all receive the same value.
309    ///
310    /// # Performance
311    ///
312    /// O(1), performs 2 Arc clones (~10-20ns)
313    ///
314    /// # Examples
315    ///
316    /// ```
317    /// use announcement::Announcement;
318    ///
319    /// let (announcer, announcement) = Announcement::new();
320    /// let listener1 = announcement.listener();
321    /// let listener2 = announcement.listener();
322    ///
323    /// announcer.announce(42).unwrap();
324    ///
325    /// assert_eq!(listener1.try_listen(), Some(42));
326    /// assert_eq!(listener2.try_listen(), Some(42));
327    /// ```
328    pub fn listener(&self) -> Listener<T> {
329        Listener {
330            inner: Arc::clone(&self.inner),
331            event: Arc::clone(&self.event),
332        }
333    }
334
335    /// Check if a value has been announced.
336    ///
337    /// This is a non-blocking, lock-free operation.
338    ///
339    /// # Performance
340    ///
341    /// O(1), ~5-10ns
342    ///
343    /// # Examples
344    ///
345    /// ```
346    /// use announcement::Announcement;
347    ///
348    /// let (announcer, announcement) = Announcement::<i32>::new();
349    /// assert!(!announcement.is_announced());
350    ///
351    /// announcer.announce(42).unwrap();
352    /// assert!(announcement.is_announced());
353    /// ```
354    #[inline]
355    pub fn is_announced(&self) -> bool {
356        self.inner.value.get().is_some()
357    }
358
359    /// Check if the announcer was closed without announcing.
360    ///
361    /// Returns `true` if the announcer was explicitly closed or dropped without announcing.
362    /// This is a non-blocking, lock-free operation.
363    ///
364    /// # Performance
365    ///
366    /// O(1), ~5-10ns
367    ///
368    /// # Examples
369    ///
370    /// ```
371    /// use announcement::Announcement;
372    ///
373    /// let (announcer, announcement) = Announcement::<i32>::new();
374    /// assert!(!announcement.is_closed());
375    ///
376    /// drop(announcer);
377    /// assert!(announcement.is_closed());
378    /// ```
379    #[inline]
380    pub fn is_closed(&self) -> bool {
381        self.inner.closed.load(Ordering::SeqCst)
382    }
383}
384
385impl<T> Announcer<T> {
386    /// Announce a value to all listeners.
387    ///
388    /// This consumes the announcer, enforcing one-time use. All waiting listeners
389    /// will be woken up and can retrieve the value.
390    ///
391    /// # Returns
392    ///
393    /// - `Ok(())` - Value successfully announced
394    /// - `Err(AnnounceError::AlreadyAnnounced(val))` - Already announced (unreachable)
395    ///
396    /// **Note**: This error is **unreachable** with the public API because `Announcer` is not
397    /// `Clone` and `announce()` consumes `self`, making it impossible to call twice. The error
398    /// exists for soundness (required by `OnceLock::set()`) but will never occur in correct usage.
399    ///
400    /// # Performance
401    ///
402    /// ~10-40ns for the announce operation, plus O(N) time to wake N waiting listeners.
403    /// This is a non-blocking operation.
404    ///
405    /// # Thread Safety
406    ///
407    /// This method is thread-safe and can be called from any thread. The announced value
408    /// will be visible to all listeners across all threads.
409    ///
410    /// # Examples
411    ///
412    /// ```
413    /// use announcement::Announcement;
414    ///
415    /// let (announcer, announcement) = Announcement::new();
416    /// let listener = announcement.listener();
417    ///
418    /// announcer.announce(42).unwrap();
419    /// assert_eq!(listener.try_listen(), Some(42));
420    /// ```
421    pub fn announce(self, val: T) -> Result<(), AnnounceError<T>> {
422        #[cfg(feature = "tracing")]
423        tracing::debug!("Announcing value");
424
425        // Try to set the value (atomic operation)
426        // Note: AlreadyAnnounced is unreachable because Announcer is not Clone
427        // and announce() consumes self, making it impossible to call twice.
428        // The error case is required by OnceLock::set() but cannot occur in practice.
429        // COVERAGE: The .map_err line below is UNREACHABLE and cannot be tested.
430        self.inner
431            .value
432            .set(val)
433            .map_err(AnnounceError::AlreadyAnnounced)?;
434
435        // Wake all waiting listeners
436        #[cfg(feature = "tracing")]
437        tracing::trace!("Notifying all waiting listeners");
438        self.event.notify(usize::MAX);
439
440        #[cfg(feature = "tracing")]
441        tracing::debug!("Value announced successfully");
442
443        Ok(())
444    }
445
446    /// Close the announcer without announcing a value.
447    ///
448    /// This explicitly closes the channel, notifying all listeners that no value
449    /// will be announced. All waiting listeners will wake up and receive
450    /// `Err(ListenError::Closed)`.
451    ///
452    /// **Note on `close()` vs Drop**: This method consumes `self`, so Rust's Drop
453    /// implementation runs immediately afterward. The Drop impl also closes the channel
454    /// if no value was announced, making `close()` and just dropping the announcer
455    /// functionally equivalent. Use `close()` for explicitness and clarity of intent.
456    ///
457    /// # Examples
458    ///
459    /// ```
460    /// use announcement::Announcement;
461    ///
462    /// let (announcer, announcement) = Announcement::<i32>::new();
463    /// let listener = announcement.listener();
464    ///
465    /// announcer.close();
466    /// // Listener would receive Err(ListenError::Closed)
467    /// ```
468    ///
469    /// Dropping the announcer has the same effect:
470    ///
471    /// ```
472    /// use announcement::{Announcement, ListenError};
473    ///
474    /// let (announcer, announcement) = Announcement::<i32>::new();
475    /// let listener = announcement.listener();
476    ///
477    /// drop(announcer); // Auto-closes via Drop impl
478    /// assert_eq!(listener.try_listen(), None);
479    /// assert_eq!(listener.is_closed(), true);
480    /// ```
481    ///
482    /// # Mutation Testing Note
483    ///
484    /// This method is marked with `#[cfg_attr(test, mutants::skip)]` because mutations
485    /// to this function cannot be detected by tests. The reason: `close(self)` consumes
486    /// self, which immediately triggers the Drop implementation. Drop performs identical
487    /// operations (set closed flag + notify listeners), making close() and Drop functionally
488    /// equivalent. Any mutation to close() is masked by Drop's safety net, which is by design.
489    ///
490    /// This is not a testing gap - it reflects the intentional redundancy where Drop serves
491    /// as a guarantee that listeners are never left hanging, even if close() is not called.
492    #[cfg_attr(test, mutants::skip)]
493    pub fn close(self) {
494        #[cfg(feature = "tracing")]
495        tracing::debug!("Closing announcer without announcing");
496
497        // Set the closed flag
498        self.inner.closed.store(true, Ordering::SeqCst);
499
500        // Wake all waiting listeners
501        #[cfg(feature = "tracing")]
502        tracing::trace!("Notifying all waiting listeners of closure");
503        self.event.notify(usize::MAX);
504    }
505}
506
507impl<T> Drop for Announcer<T> {
508    /// Automatically close the channel when the announcer is dropped without announcing.
509    ///
510    /// This ensures listeners don't wait indefinitely if the announcer is dropped.
511    ///
512    /// # Behavior
513    ///
514    /// If neither `announce()` nor `close()` was called, dropping the announcer will:
515    /// 1. Set the closed flag (with Release ordering)
516    /// 2. Wake all waiting listeners
517    /// 3. Cause future `listen()` calls to return `Err(ListenError::Closed)`
518    ///
519    /// If a value was already announced, dropping does nothing (listeners already have the value).
520    ///
521    /// # Example
522    ///
523    /// ```
524    /// use announcement::{Announcement, ListenError};
525    ///
526    /// let (announcer, announcement) = Announcement::<i32>::new();
527    /// let listener = announcement.listener();
528    ///
529    /// // Announcer dropped here without calling announce() or close()
530    /// drop(announcer);
531    ///
532    /// // Listeners are notified of closure
533    /// assert!(announcement.is_closed());
534    /// // Non-blocking check sees no value
535    /// assert_eq!(listener.try_listen(), None);
536    /// ```
537    ///
538    /// # Mutation Testing Note
539    ///
540    /// The Drop implementation is marked with `#[cfg_attr(test, mutants::skip)]` because
541    /// mutations to the guard condition cause test timeouts rather than failures:
542    /// - Replacing the body causes listeners to hang indefinitely
543    /// - Mutating the boolean logic (&&, !) causes listeners to never be notified
544    ///
545    /// These timeouts actually prove the tests would catch the bugs, but at the cost
546    /// of 60+ second hangs per mutation. The existing drop tests provide adequate coverage.
547    #[cfg_attr(test, mutants::skip)]
548    fn drop(&mut self) {
549        // Only close if we haven't announced yet
550        if self.inner.value.get().is_none() && !self.inner.closed.load(Ordering::SeqCst) {
551            #[cfg(feature = "tracing")]
552            tracing::debug!("Announcer dropped without announcing, auto-closing");
553
554            // Set the closed flag
555            self.inner.closed.store(true, Ordering::SeqCst);
556
557            // Wake all waiting listeners
558            #[cfg(feature = "tracing")]
559            tracing::trace!("Notifying all waiting listeners of auto-closure");
560            self.event.notify(usize::MAX);
561        }
562    }
563}
564
565impl<T: Clone> Listener<T> {
566    /// Try to receive without waiting.
567    ///
568    /// Returns `Some(value)` if a value has been announced, or `None` if not yet announced.
569    ///
570    /// # Performance
571    ///
572    /// O(1), ~5-10ns, non-blocking
573    ///
574    /// # Examples
575    ///
576    /// ```
577    /// use announcement::Announcement;
578    ///
579    /// let (announcer, announcement) = Announcement::new();
580    /// let listener = announcement.listener();
581    ///
582    /// assert_eq!(listener.try_listen(), None);
583    ///
584    /// announcer.announce(42).unwrap();
585    /// assert_eq!(listener.try_listen(), Some(42));
586    /// ```
587    #[inline]
588    pub fn try_listen(&self) -> Option<T> {
589        self.inner.value.get().cloned()
590    }
591
592    /// Check if the announcer was closed without announcing.
593    ///
594    /// Returns `true` if the announcer was explicitly closed or dropped without announcing.
595    /// This is a non-blocking, lock-free operation.
596    ///
597    /// # Performance
598    ///
599    /// O(1), ~5-10ns
600    ///
601    /// # Examples
602    ///
603    /// ```
604    /// use announcement::Announcement;
605    ///
606    /// let (announcer, announcement) = Announcement::<i32>::new();
607    /// let listener = announcement.listener();
608    /// assert!(!listener.is_closed());
609    ///
610    /// drop(announcer);
611    /// assert!(listener.is_closed());
612    /// ```
613    #[inline]
614    pub fn is_closed(&self) -> bool {
615        self.inner.closed.load(Ordering::SeqCst)
616    }
617}
618
619impl<T: Clone> Listener<T> {
620    /// Wait for and receive the announced value (async).
621    ///
622    /// Waits until a value is announced or the announcer is closed. Multiple calls
623    /// return the same result.
624    ///
625    /// # Returns
626    ///
627    /// - `Ok(value)` - A value was announced
628    /// - `Err(ListenError::Closed)` - The announcer was dropped or closed without announcing
629    ///
630    /// # Cancel Safety
631    ///
632    /// This method is **cancel-safe**. Dropping the future (e.g., via `select!` or `timeout`)
633    /// will not lose the announced value. The value remains available for subsequent calls
634    /// to `listen()` or `try_listen()`.
635    ///
636    /// You can safely use this method with:
637    /// - `tokio::select!` - Choose between multiple futures
638    /// - `tokio::time::timeout` - Add a timeout
639    /// - Any other cancellation mechanism
640    ///
641    /// # Multiple Calls
642    ///
643    /// Calling `listen()` multiple times (even after it returns) will always return the
644    /// same value. The value is stored in an `Arc<OnceLock>` and cloned for each listener.
645    ///
646    /// # Performance
647    ///
648    /// - Fast path (already announced): ~5-10ns
649    /// - Slow path (waiting): ~100ns + wait time
650    ///
651    /// # Examples
652    ///
653    /// ```
654    /// use announcement::Announcement;
655    ///
656    /// # #[tokio::main]
657    /// # async fn main() {
658    /// let (announcer, announcement) = Announcement::new();
659    /// let listener = announcement.listener();
660    ///
661    /// tokio::spawn(async move {
662    ///     announcer.announce(42).unwrap();
663    /// });
664    ///
665    /// let value = listener.listen().await.unwrap();
666    /// assert_eq!(value, 42);
667    /// # }
668    /// ```
669    pub async fn listen(&self) -> Result<T, ListenError> {
670        loop {
671            // Fast path: value already announced
672            if let Some(val) = self.inner.value.get() {
673                #[cfg(feature = "tracing")]
674                tracing::trace!("Value already announced (fast path)");
675                return Ok(val.clone());
676            }
677
678            // Check if closed
679            if self.inner.closed.load(Ordering::SeqCst) {
680                #[cfg(feature = "tracing")]
681                tracing::trace!("Announcer was closed without announcing");
682                return Err(ListenError::Closed);
683            }
684
685            // Slow path: set up listener before waiting
686            #[cfg(feature = "tracing")]
687            tracing::trace!("Waiting for value to be announced");
688
689            let event_listener = self.event.listen();
690
691            // Check again (TOCTOU protection)
692            // This prevents the race where the value is set after we check
693            // but before we register the listener
694            if let Some(val) = self.inner.value.get() {
695                #[cfg(feature = "tracing")]
696                tracing::trace!("Value announced during listener setup");
697                return Ok(val.clone());
698            }
699
700            // Check closed again
701            if self.inner.closed.load(Ordering::SeqCst) {
702                #[cfg(feature = "tracing")]
703                tracing::trace!("Announcer closed during listener setup");
704                return Err(ListenError::Closed);
705            }
706
707            // Wait for notification
708            event_listener.await;
709
710            #[cfg(feature = "tracing")]
711            tracing::trace!("Received notification, checking for value or closure");
712        }
713    }
714}
715
716#[cfg(feature = "std")]
717impl<T: Clone> Listener<T> {
718    /// Blocking wait for the announced value.
719    ///
720    /// This method blocks the current thread until a value is announced or the
721    /// announcer is closed. FOR NON-ASYNC CONTEXTS ONLY.
722    ///
723    /// **Requires**: `std` feature (enabled by default)
724    ///
725    /// # Returns
726    ///
727    /// - `Ok(value)` - A value was announced
728    /// - `Err(ListenError::Closed)` - The announcer was dropped or closed without announcing
729    ///
730    /// # ⚠️ Warning: Async Context
731    ///
732    /// **DO NOT** use this method in async functions or tasks. It will block the executor
733    /// thread, preventing other tasks from running. This can cause:
734    /// - Deadlocks in async runtimes
735    /// - Poor performance degradation
736    /// - Runtime warnings or panics
737    ///
738    /// In async contexts, use [`listen()`](Self::listen) instead.
739    ///
740    /// # When to Use
741    ///
742    /// Use this method when:
743    /// - You're in a non-async context (regular threads, main function)
744    /// - You want to synchronize between threads using standard library threads
745    /// - You're interfacing with blocking APIs
746    ///
747    /// # Thread Safety
748    ///
749    /// This method is thread-safe and can be called from any thread. Multiple threads
750    /// can block on different listeners simultaneously.
751    ///
752    /// # Examples
753    ///
754    /// ```
755    /// use announcement::Announcement;
756    /// use std::thread;
757    /// use std::time::Duration;
758    ///
759    /// let (announcer, announcement) = Announcement::new();
760    /// let listener = announcement.listener();
761    ///
762    /// thread::spawn(move || {
763    ///     thread::sleep(Duration::from_millis(10));
764    ///     announcer.announce(42).unwrap();
765    /// });
766    ///
767    /// let value = listener.listen_blocking().unwrap();
768    /// assert_eq!(value, 42);
769    /// ```
770    pub fn listen_blocking(&self) -> Result<T, ListenError> {
771        loop {
772            if let Some(val) = self.inner.value.get() {
773                return Ok(val.clone());
774            }
775
776            if self.inner.closed.load(Ordering::SeqCst) {
777                return Err(ListenError::Closed);
778            }
779
780            let listener = self.event.listen();
781
782            if let Some(val) = self.inner.value.get() {
783                return Ok(val.clone());
784            }
785
786            if self.inner.closed.load(Ordering::SeqCst) {
787                return Err(ListenError::Closed);
788            }
789
790            // Block current thread
791            EventListenerTrait::wait(listener);
792        }
793    }
794
795    /// Blocking wait with timeout.
796    ///
797    /// Returns the value if it's announced within the timeout period.
798    ///
799    /// **Requires**: `std` feature (enabled by default)
800    ///
801    /// # Returns
802    ///
803    /// - `Ok(val)` - Value received within timeout
804    /// - `Err(ListenError::Timeout)` - Timeout elapsed without receiving a value
805    /// - `Err(ListenError::Closed)` - The announcer was dropped or closed without announcing
806    ///
807    /// # ⚠️ Warning: Async Context
808    ///
809    /// **DO NOT** use this method in async functions or tasks. It will block the executor
810    /// thread. In async contexts, use [`listen()`](Self::listen) with `tokio::time::timeout`
811    /// or similar timeout mechanisms instead.
812    ///
813    /// # Examples
814    ///
815    /// ```
816    /// use announcement::Announcement;
817    /// use std::time::Duration;
818    ///
819    /// let (announcer, announcement) = Announcement::new();
820    /// let listener = announcement.listener();
821    ///
822    /// announcer.announce(42).unwrap();
823    ///
824    /// let result = listener.listen_timeout_blocking(Duration::from_secs(1));
825    /// assert_eq!(result, Ok(42));
826    /// ```
827    pub fn listen_timeout_blocking(&self, duration: Duration) -> Result<T, ListenError> {
828        let deadline = Instant::now() + duration;
829
830        loop {
831            if let Some(val) = self.inner.value.get() {
832                return Ok(val.clone());
833            }
834
835            if self.inner.closed.load(Ordering::SeqCst) {
836                return Err(ListenError::Closed);
837            }
838
839            let listener = self.event.listen();
840
841            if let Some(val) = self.inner.value.get() {
842                return Ok(val.clone());
843            }
844
845            if self.inner.closed.load(Ordering::SeqCst) {
846                return Err(ListenError::Closed);
847            }
848
849            let remaining = deadline.saturating_duration_since(Instant::now());
850            if remaining.is_zero() {
851                return Err(ListenError::Timeout);
852            }
853
854            if EventListenerTrait::wait_timeout(listener, remaining).is_none() {
855                return Err(ListenError::Timeout);
856            }
857        }
858    }
859}
860
861// Send and Sync are automatically derived for our types because:
862// - Arc<OnceLock<T>> is Send + Sync when T: Send + Sync
863// - Arc<Event> is Send + Sync (Event guarantees this)
864// No unsafe code needed!
865
866#[cfg(test)]
867mod tests {
868    use super::*;
869
870    /// Test: Unit test: is_announced() method
871    ///
872    /// Verifies: is_announced() correctly reports announcement state
873    /// Category: unit
874    #[test]
875    fn test_is_announced() {
876        let (announcer, announcement) = Announcement::<i32>::new();
877        assert!(!announcement.is_announced());
878        announcer.announce(42).unwrap();
879        assert!(announcement.is_announced());
880    }
881
882    /// Test: Unit test: try_listen before announce
883    ///
884    /// Verifies: try_listen() returns None before announce and Some(value) after
885    /// Category: unit
886    #[test]
887    fn test_try_listen_before_announce() {
888        let (announcer, announcement) = Announcement::<i32>::new();
889        let listener = announcement.listener();
890        assert_eq!(listener.try_listen(), None);
891        announcer.announce(42).unwrap();
892        assert_eq!(listener.try_listen(), Some(42));
893    }
894
895    /// Test: Unit test: try_listen after announce
896    ///
897    /// Verifies: try_listen() returns Some(value) when called after announce
898    /// Category: unit
899    #[test]
900    fn test_try_listen_after_announce() {
901        let (announcer, announcement) = Announcement::<i32>::new();
902        announcer.announce(42).unwrap();
903        let listener = announcement.listener();
904        assert_eq!(listener.try_listen(), Some(42));
905    }
906
907    /// Test: Unit test: listener cloning
908    ///
909    /// Verifies: Cloned listeners both receive the announced value
910    /// Category: unit
911    #[test]
912    fn test_clone_listener() {
913        let (announcer, announcement) = Announcement::<i32>::new();
914        let listener1 = announcement.listener();
915        let listener2 = listener1.clone();
916
917        announcer.announce(42).unwrap();
918
919        assert_eq!(listener1.try_listen(), Some(42));
920        assert_eq!(listener2.try_listen(), Some(42));
921    }
922
923    /// Test: Unit test: multiple listeners
924    ///
925    /// Verifies: Multiple listeners all receive the same announced value
926    /// Category: unit
927    #[test]
928    fn test_multiple_listeners() {
929        let (announcer, announcement) = Announcement::<i32>::new();
930        let l1 = announcement.listener();
931        let l2 = announcement.listener();
932        let l3 = announcement.listener();
933
934        announcer.announce(42).unwrap();
935
936        assert_eq!(l1.try_listen(), Some(42));
937        assert_eq!(l2.try_listen(), Some(42));
938        assert_eq!(l3.try_listen(), Some(42));
939    }
940
941    /// Test: Unit test: announce with no listeners
942    ///
943    /// Verifies: Announcing succeeds even when no listeners exist
944    /// Category: unit
945    #[test]
946    fn test_announce_with_no_listeners() {
947        let (announcer, _announcement) = Announcement::<i32>::new();
948        // No listeners created
949        announcer.announce(42).unwrap(); // Should succeed
950    }
951
952    /// Test: Unit test: AnnounceError::into_inner()
953    ///
954    /// Verifies: into_inner() extracts the value from AnnounceError
955    /// Category: unit
956    #[test]
957    fn test_error_into_inner() {
958        let err = AnnounceError::AlreadyAnnounced(99);
959        assert_eq!(err.into_inner(), 99);
960    }
961
962    /// Test: Unit test: announcement cloning
963    ///
964    /// Verifies: Cloned announcements share the same value
965    /// Category: unit
966    #[test]
967    fn test_clone_announcement() {
968        let (announcer, announcement) = Announcement::<i32>::new();
969        let announcement2 = announcement.clone();
970
971        let l1 = announcement.listener();
972        let l2 = announcement2.listener();
973
974        announcer.announce(42).unwrap();
975
976        assert_eq!(l1.try_listen(), Some(42));
977        assert_eq!(l2.try_listen(), Some(42));
978    }
979
980    /// Test: Unit test: Arc value handling
981    ///
982    /// Verifies: Arc-wrapped values work correctly with announcement
983    /// Category: unit
984    #[test]
985    fn test_with_arc() {
986        use std::sync::Arc;
987
988        #[derive(Clone, Debug, PartialEq)]
989        struct Data {
990            value: Vec<u8>,
991        }
992
993        let (announcer, announcement) = Announcement::new();
994        let data = Arc::new(Data {
995            value: vec![1, 2, 3],
996        });
997
998        let l1 = announcement.listener();
999        let l2 = announcement.listener();
1000
1001        announcer.announce(data.clone()).unwrap();
1002
1003        let r1 = l1.try_listen().unwrap();
1004        let r2 = l2.try_listen().unwrap();
1005
1006        assert_eq!(r1, data);
1007        assert_eq!(r2, data);
1008
1009        // Verify Arc refcount is 4 (original + announcement + r1 + r2)
1010        assert_eq!(Arc::strong_count(&r1), 4);
1011    }
1012
1013    /// Test: Unit test: AnnounceError Display trait
1014    ///
1015    /// Verifies: AnnounceError implements Display correctly
1016    /// Category: unit
1017    #[test]
1018    fn test_error_display() {
1019        let err = AnnounceError::AlreadyAnnounced(42);
1020        assert_eq!(err.to_string(), "a value has already been announced");
1021    }
1022
1023    /// Test: Unit test: AnnounceError Display trait
1024    ///
1025    /// Verifies: AnnounceError implements Display correctly
1026    /// Category: unit
1027    #[test]
1028    fn test_listen_error_display() {
1029        let err = ListenError::Timeout;
1030        assert_eq!(
1031            err.to_string(),
1032            "timeout elapsed while waiting for announcement"
1033        );
1034    }
1035}