announcement/lib.rs
1//! A runtime-agnostic oneshot broadcast channel for Rust.
2//!
3//! Copyright (c) 2025 Stephen Waits <steve@waits.net>
4//!
5//! This crate provides a simple way to broadcast a value once to multiple listeners.
6//! It's built on `Arc<OnceLock>` and `event-listener` for efficiency and runtime agnosticism.
7//!
8//! # Features
9//!
10//! - **Simple API** - Create, announce, listen
11//! - **Fast** - Built on `Arc<OnceLock>` and `event-listener`
12//! - **Runtime-agnostic** - Works with tokio, async-std, smol, or any executor
13//! - **Type-safe** - Can only announce once, listeners are cloneable
14//! - **Lightweight** - Minimal dependencies
15//! - **Thread-safe** - All types are Send + Sync when T: Send + Sync
16//! - **Cancel-safe** - Async operations can be safely cancelled
17//!
18//! # Feature Flags
19//!
20//! - `default` - Includes std feature
21//! - `std` - Enables std library support (required for blocking operations: `listen_blocking()`, `listen_timeout_blocking()`)
22//! - `tracing` - Enables tracing integration for debugging
23//!
24//! # Quick Start
25//!
26//! ```ignore
27//! use announcement::Announcement;
28//!
29//! #[tokio::main]
30//! async fn main() {
31//! // Create announcement channel
32//! let (announcer, announcement) = Announcement::new();
33//!
34//! // Create multiple listeners
35//! let listener1 = announcement.listener();
36//! let listener2 = announcement.listener();
37//!
38//! // Spawn tasks
39//! tokio::spawn(async move {
40//! let value = listener1.listen().await;
41//! println!("Listener 1: {:?}", value);
42//! });
43//!
44//! tokio::spawn(async move {
45//! let value = listener2.listen().await;
46//! println!("Listener 2: {:?}", value);
47//! });
48//!
49//! // Announce to all listeners
50//! announcer.announce(42).unwrap();
51//! }
52//! ```
53//!
54//! # Use Cases
55//!
56//! - **Shutdown signals** - Notify all tasks to shut down
57//! - **Configuration broadcast** - Share initialized config to all workers
58//! - **Lazy initialization** - Signal when a resource is ready
59//! - **Event fanout** - Broadcast a one-time event to multiple subscribers
60//!
61//! # Comparison to Alternatives
62//!
63//! Unlike `tokio::sync::broadcast`, `announcement` is:
64//! - Single-shot (announce once)
65//! - Runtime-agnostic
66//! - Simpler API for one-time broadcasts
67//!
68//! # Performance
69//!
70//! - Announcement creation: ~200ns
71//! - Listener creation: ~20ns
72//! - Announce operation: ~10-40ns
73//! - Listen (already announced): ~5-10ns
74//!
75//! For N listeners with `Arc<T>`:
76//! - Memory: O(1) - single allocation of T
77//! - Time: O(N) arc clones (~10ns each)
78//!
79//! # Thread Safety & Memory Model
80//!
81//! All types (`Announcement`, `Announcer`, `Listener`) are `Send + Sync` when `T: Send + Sync`.
82//!
83//! ## Memory Ordering Guarantees
84//!
85//! - **Announce → Listen**: The announce operation has a happens-before relationship with
86//! all listen operations. Any writes before `announce()` are visible after `listen()` returns.
87//! - **Closed Flag**: Uses SeqCst ordering to ensure visibility of the closed state across
88//! all threads, including on weakly-ordered architectures (ARM, RISC-V, PowerPC).
89//! - **Value Storage**: `OnceLock` provides its own synchronization guarantees.
90//!
91//! ## TOCTOU Protection
92//!
93//! All async listen operations use a double-check pattern to prevent Time-Of-Check-Time-Of-Use
94//! races between checking for a value and waiting for notification.
95//!
96//! # Cancel Safety
97//!
98//! The `listen()` method is cancel-safe: dropping the future will not lose the announced value.
99//! You can safely use it with `select!`, `timeout`, or any operation that might cancel the future.
100//! The value remains available for subsequent `listen()` or `try_listen()` calls.
101
102use std::sync::atomic::{AtomicBool, Ordering};
103use std::sync::{Arc, OnceLock};
104
105use event_listener::Event;
106
107#[cfg(feature = "std")]
108use event_listener::Listener as EventListenerTrait;
109
110#[cfg(feature = "std")]
111use std::time::{Duration, Instant};
112
113/// Internal shared state for the announcement channel.
114struct AnnouncementInner<T> {
115 /// The announced value (set exactly once).
116 value: OnceLock<T>,
117 /// Whether the announcer was closed without announcing.
118 closed: AtomicBool,
119}
120
121/// The main announcement channel type that creates listeners.
122///
123/// This is the primary interface for creating listeners that will receive
124/// the announced value. It can be cloned cheaply to share across threads.
125///
126/// # Examples
127///
128/// ```
129/// use announcement::Announcement;
130///
131/// let (announcer, announcement) = Announcement::<i32>::new();
132/// let listener1 = announcement.listener();
133/// let listener2 = announcement.listener();
134///
135/// announcer.announce(42).unwrap();
136///
137/// assert_eq!(listener1.try_listen(), Some(42));
138/// assert_eq!(listener2.try_listen(), Some(42));
139/// ```
140#[must_use = "Announcement should be used to create listeners"]
141#[derive(Clone)]
142pub struct Announcement<T> {
143 inner: Arc<AnnouncementInner<T>>,
144 event: Arc<Event>,
145}
146
147/// The sending end of the announcement channel.
148///
149/// This type is NOT `Clone` - it can only be used once to ensure
150/// that a value can only be announced once.
151///
152/// # Examples
153///
154/// ```
155/// use announcement::Announcement;
156///
157/// let (announcer, announcement) = Announcement::<String>::new();
158/// announcer.announce("Hello!".to_string()).unwrap();
159/// ```
160#[must_use = "Announcer must be used; call announce() or close()"]
161pub struct Announcer<T> {
162 inner: Arc<AnnouncementInner<T>>,
163 event: Arc<Event>,
164}
165
166/// A listener that waits for an announced value.
167///
168/// This type is `Clone` and can be shared across multiple tasks or threads.
169/// All clones will receive the same value when it's announced.
170///
171/// # Examples
172///
173/// ```
174/// use announcement::Announcement;
175///
176/// let (announcer, announcement) = Announcement::new();
177/// let listener = announcement.listener();
178///
179/// announcer.announce(42).unwrap();
180/// assert_eq!(listener.try_listen(), Some(42));
181/// ```
182#[must_use = "Listener must be used; call listen() or try_listen()"]
183#[derive(Clone)]
184pub struct Listener<T> {
185 inner: Arc<AnnouncementInner<T>>,
186 event: Arc<Event>,
187}
188
189/// Error returned when attempting to announce a value.
190///
191/// # Examples
192///
193/// ```
194/// use announcement::{Announcement, AnnounceError};
195///
196/// let (announcer, announcement) = Announcement::new();
197/// announcer.announce(42).unwrap();
198///
199/// // Cannot announce again - would need a second announcer which can't be created
200/// let err = AnnounceError::AlreadyAnnounced(99);
201/// assert_eq!(err.into_inner(), 99);
202/// ```
203#[derive(Debug, Clone, PartialEq, Eq)]
204pub enum AnnounceError<T> {
205 /// A value was already announced.
206 /// Contains the value you tried to send.
207 AlreadyAnnounced(T),
208}
209
210impl<T> AnnounceError<T> {
211 /// Extract the value from the error.
212 ///
213 /// # Examples
214 ///
215 /// ```
216 /// use announcement::AnnounceError;
217 ///
218 /// let err = AnnounceError::AlreadyAnnounced(42);
219 /// assert_eq!(err.into_inner(), 42);
220 /// ```
221 pub fn into_inner(self) -> T {
222 match self {
223 AnnounceError::AlreadyAnnounced(val) => val,
224 }
225 }
226}
227
228impl<T> std::fmt::Display for AnnounceError<T> {
229 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
230 match self {
231 AnnounceError::AlreadyAnnounced(_) => {
232 write!(f, "a value has already been announced")
233 }
234 }
235 }
236}
237
238impl<T: std::fmt::Debug> std::error::Error for AnnounceError<T> {}
239
240/// Error returned when listening for a value.
241///
242/// # Examples
243///
244/// ```
245/// use announcement::ListenError;
246///
247/// let err = ListenError::Timeout;
248/// assert_eq!(err.to_string(), "timeout elapsed while waiting for announcement");
249///
250/// let err = ListenError::Closed;
251/// assert_eq!(err.to_string(), "announcer was dropped without announcing");
252/// ```
253#[derive(Debug, Clone, Copy, PartialEq, Eq)]
254pub enum ListenError {
255 /// Timeout elapsed without receiving a value
256 Timeout,
257 /// Announcer was dropped without announcing
258 Closed,
259}
260
261impl std::fmt::Display for ListenError {
262 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
263 match self {
264 ListenError::Timeout => write!(f, "timeout elapsed while waiting for announcement"),
265 ListenError::Closed => write!(f, "announcer was dropped without announcing"),
266 }
267 }
268}
269
270impl std::error::Error for ListenError {}
271
272impl<T: Clone + Send + Sync + 'static> Announcement<T> {
273 /// Create a new announcement channel.
274 ///
275 /// Returns a tuple of `(Announcer, Announcement)`. The announcer is used to
276 /// broadcast the value once, and the announcement is used to create listeners.
277 ///
278 /// # Examples
279 ///
280 /// ```
281 /// use announcement::Announcement;
282 ///
283 /// let (announcer, announcement) = Announcement::<i32>::new();
284 /// ```
285 pub fn new() -> (Announcer<T>, Self) {
286 #[cfg(feature = "tracing")]
287 tracing::trace!("Creating new announcement channel");
288
289 let inner = Arc::new(AnnouncementInner {
290 value: OnceLock::new(),
291 closed: AtomicBool::new(false),
292 });
293 let event = Arc::new(Event::new());
294
295 let announcer = Announcer {
296 inner: Arc::clone(&inner),
297 event: Arc::clone(&event),
298 };
299
300 let announcement = Self { inner, event };
301
302 (announcer, announcement)
303 }
304
305 /// Create a new listener for this announcement.
306 ///
307 /// Can be called before or after announcing. Can be called multiple times
308 /// to create multiple independent listeners that will all receive the same value.
309 ///
310 /// # Performance
311 ///
312 /// O(1), performs 2 Arc clones (~10-20ns)
313 ///
314 /// # Examples
315 ///
316 /// ```
317 /// use announcement::Announcement;
318 ///
319 /// let (announcer, announcement) = Announcement::new();
320 /// let listener1 = announcement.listener();
321 /// let listener2 = announcement.listener();
322 ///
323 /// announcer.announce(42).unwrap();
324 ///
325 /// assert_eq!(listener1.try_listen(), Some(42));
326 /// assert_eq!(listener2.try_listen(), Some(42));
327 /// ```
328 pub fn listener(&self) -> Listener<T> {
329 Listener {
330 inner: Arc::clone(&self.inner),
331 event: Arc::clone(&self.event),
332 }
333 }
334
335 /// Check if a value has been announced.
336 ///
337 /// This is a non-blocking, lock-free operation.
338 ///
339 /// # Performance
340 ///
341 /// O(1), ~5-10ns
342 ///
343 /// # Examples
344 ///
345 /// ```
346 /// use announcement::Announcement;
347 ///
348 /// let (announcer, announcement) = Announcement::<i32>::new();
349 /// assert!(!announcement.is_announced());
350 ///
351 /// announcer.announce(42).unwrap();
352 /// assert!(announcement.is_announced());
353 /// ```
354 #[inline]
355 pub fn is_announced(&self) -> bool {
356 self.inner.value.get().is_some()
357 }
358
359 /// Check if the announcer was closed without announcing.
360 ///
361 /// Returns `true` if the announcer was explicitly closed or dropped without announcing.
362 /// This is a non-blocking, lock-free operation.
363 ///
364 /// # Performance
365 ///
366 /// O(1), ~5-10ns
367 ///
368 /// # Examples
369 ///
370 /// ```
371 /// use announcement::Announcement;
372 ///
373 /// let (announcer, announcement) = Announcement::<i32>::new();
374 /// assert!(!announcement.is_closed());
375 ///
376 /// drop(announcer);
377 /// assert!(announcement.is_closed());
378 /// ```
379 #[inline]
380 pub fn is_closed(&self) -> bool {
381 self.inner.closed.load(Ordering::SeqCst)
382 }
383}
384
385impl<T> Announcer<T> {
386 /// Announce a value to all listeners.
387 ///
388 /// This consumes the announcer, enforcing one-time use. All waiting listeners
389 /// will be woken up and can retrieve the value.
390 ///
391 /// # Returns
392 ///
393 /// - `Ok(())` - Value successfully announced
394 /// - `Err(AnnounceError::AlreadyAnnounced(val))` - Already announced (unreachable)
395 ///
396 /// **Note**: This error is **unreachable** with the public API because `Announcer` is not
397 /// `Clone` and `announce()` consumes `self`, making it impossible to call twice. The error
398 /// exists for soundness (required by `OnceLock::set()`) but will never occur in correct usage.
399 ///
400 /// # Performance
401 ///
402 /// ~10-40ns for the announce operation, plus O(N) time to wake N waiting listeners.
403 /// This is a non-blocking operation.
404 ///
405 /// # Thread Safety
406 ///
407 /// This method is thread-safe and can be called from any thread. The announced value
408 /// will be visible to all listeners across all threads.
409 ///
410 /// # Examples
411 ///
412 /// ```
413 /// use announcement::Announcement;
414 ///
415 /// let (announcer, announcement) = Announcement::new();
416 /// let listener = announcement.listener();
417 ///
418 /// announcer.announce(42).unwrap();
419 /// assert_eq!(listener.try_listen(), Some(42));
420 /// ```
421 pub fn announce(self, val: T) -> Result<(), AnnounceError<T>> {
422 #[cfg(feature = "tracing")]
423 tracing::debug!("Announcing value");
424
425 // Try to set the value (atomic operation)
426 // Note: AlreadyAnnounced is unreachable because Announcer is not Clone
427 // and announce() consumes self, making it impossible to call twice.
428 // The error case is required by OnceLock::set() but cannot occur in practice.
429 // COVERAGE: The .map_err line below is UNREACHABLE and cannot be tested.
430 self.inner
431 .value
432 .set(val)
433 .map_err(AnnounceError::AlreadyAnnounced)?;
434
435 // Wake all waiting listeners
436 #[cfg(feature = "tracing")]
437 tracing::trace!("Notifying all waiting listeners");
438 self.event.notify(usize::MAX);
439
440 #[cfg(feature = "tracing")]
441 tracing::debug!("Value announced successfully");
442
443 Ok(())
444 }
445
446 /// Close the announcer without announcing a value.
447 ///
448 /// This explicitly closes the channel, notifying all listeners that no value
449 /// will be announced. All waiting listeners will wake up and receive
450 /// `Err(ListenError::Closed)`.
451 ///
452 /// **Note on `close()` vs Drop**: This method consumes `self`, so Rust's Drop
453 /// implementation runs immediately afterward. The Drop impl also closes the channel
454 /// if no value was announced, making `close()` and just dropping the announcer
455 /// functionally equivalent. Use `close()` for explicitness and clarity of intent.
456 ///
457 /// # Examples
458 ///
459 /// ```
460 /// use announcement::Announcement;
461 ///
462 /// let (announcer, announcement) = Announcement::<i32>::new();
463 /// let listener = announcement.listener();
464 ///
465 /// announcer.close();
466 /// // Listener would receive Err(ListenError::Closed)
467 /// ```
468 ///
469 /// Dropping the announcer has the same effect:
470 ///
471 /// ```
472 /// use announcement::{Announcement, ListenError};
473 ///
474 /// let (announcer, announcement) = Announcement::<i32>::new();
475 /// let listener = announcement.listener();
476 ///
477 /// drop(announcer); // Auto-closes via Drop impl
478 /// assert_eq!(listener.try_listen(), None);
479 /// assert_eq!(listener.is_closed(), true);
480 /// ```
481 ///
482 /// # Mutation Testing Note
483 ///
484 /// This method is marked with `#[cfg_attr(test, mutants::skip)]` because mutations
485 /// to this function cannot be detected by tests. The reason: `close(self)` consumes
486 /// self, which immediately triggers the Drop implementation. Drop performs identical
487 /// operations (set closed flag + notify listeners), making close() and Drop functionally
488 /// equivalent. Any mutation to close() is masked by Drop's safety net, which is by design.
489 ///
490 /// This is not a testing gap - it reflects the intentional redundancy where Drop serves
491 /// as a guarantee that listeners are never left hanging, even if close() is not called.
492 #[cfg_attr(test, mutants::skip)]
493 pub fn close(self) {
494 #[cfg(feature = "tracing")]
495 tracing::debug!("Closing announcer without announcing");
496
497 // Set the closed flag
498 self.inner.closed.store(true, Ordering::SeqCst);
499
500 // Wake all waiting listeners
501 #[cfg(feature = "tracing")]
502 tracing::trace!("Notifying all waiting listeners of closure");
503 self.event.notify(usize::MAX);
504 }
505}
506
507impl<T> Drop for Announcer<T> {
508 /// Automatically close the channel when the announcer is dropped without announcing.
509 ///
510 /// This ensures listeners don't wait indefinitely if the announcer is dropped.
511 ///
512 /// # Behavior
513 ///
514 /// If neither `announce()` nor `close()` was called, dropping the announcer will:
515 /// 1. Set the closed flag (with Release ordering)
516 /// 2. Wake all waiting listeners
517 /// 3. Cause future `listen()` calls to return `Err(ListenError::Closed)`
518 ///
519 /// If a value was already announced, dropping does nothing (listeners already have the value).
520 ///
521 /// # Example
522 ///
523 /// ```
524 /// use announcement::{Announcement, ListenError};
525 ///
526 /// let (announcer, announcement) = Announcement::<i32>::new();
527 /// let listener = announcement.listener();
528 ///
529 /// // Announcer dropped here without calling announce() or close()
530 /// drop(announcer);
531 ///
532 /// // Listeners are notified of closure
533 /// assert!(announcement.is_closed());
534 /// // Non-blocking check sees no value
535 /// assert_eq!(listener.try_listen(), None);
536 /// ```
537 ///
538 /// # Mutation Testing Note
539 ///
540 /// The Drop implementation is marked with `#[cfg_attr(test, mutants::skip)]` because
541 /// mutations to the guard condition cause test timeouts rather than failures:
542 /// - Replacing the body causes listeners to hang indefinitely
543 /// - Mutating the boolean logic (&&, !) causes listeners to never be notified
544 ///
545 /// These timeouts actually prove the tests would catch the bugs, but at the cost
546 /// of 60+ second hangs per mutation. The existing drop tests provide adequate coverage.
547 #[cfg_attr(test, mutants::skip)]
548 fn drop(&mut self) {
549 // Only close if we haven't announced yet
550 if self.inner.value.get().is_none() && !self.inner.closed.load(Ordering::SeqCst) {
551 #[cfg(feature = "tracing")]
552 tracing::debug!("Announcer dropped without announcing, auto-closing");
553
554 // Set the closed flag
555 self.inner.closed.store(true, Ordering::SeqCst);
556
557 // Wake all waiting listeners
558 #[cfg(feature = "tracing")]
559 tracing::trace!("Notifying all waiting listeners of auto-closure");
560 self.event.notify(usize::MAX);
561 }
562 }
563}
564
565impl<T: Clone> Listener<T> {
566 /// Try to receive without waiting.
567 ///
568 /// Returns `Some(value)` if a value has been announced, or `None` if not yet announced.
569 ///
570 /// # Performance
571 ///
572 /// O(1), ~5-10ns, non-blocking
573 ///
574 /// # Examples
575 ///
576 /// ```
577 /// use announcement::Announcement;
578 ///
579 /// let (announcer, announcement) = Announcement::new();
580 /// let listener = announcement.listener();
581 ///
582 /// assert_eq!(listener.try_listen(), None);
583 ///
584 /// announcer.announce(42).unwrap();
585 /// assert_eq!(listener.try_listen(), Some(42));
586 /// ```
587 #[inline]
588 pub fn try_listen(&self) -> Option<T> {
589 self.inner.value.get().cloned()
590 }
591
592 /// Check if the announcer was closed without announcing.
593 ///
594 /// Returns `true` if the announcer was explicitly closed or dropped without announcing.
595 /// This is a non-blocking, lock-free operation.
596 ///
597 /// # Performance
598 ///
599 /// O(1), ~5-10ns
600 ///
601 /// # Examples
602 ///
603 /// ```
604 /// use announcement::Announcement;
605 ///
606 /// let (announcer, announcement) = Announcement::<i32>::new();
607 /// let listener = announcement.listener();
608 /// assert!(!listener.is_closed());
609 ///
610 /// drop(announcer);
611 /// assert!(listener.is_closed());
612 /// ```
613 #[inline]
614 pub fn is_closed(&self) -> bool {
615 self.inner.closed.load(Ordering::SeqCst)
616 }
617}
618
619impl<T: Clone> Listener<T> {
620 /// Wait for and receive the announced value (async).
621 ///
622 /// Waits until a value is announced or the announcer is closed. Multiple calls
623 /// return the same result.
624 ///
625 /// # Returns
626 ///
627 /// - `Ok(value)` - A value was announced
628 /// - `Err(ListenError::Closed)` - The announcer was dropped or closed without announcing
629 ///
630 /// # Cancel Safety
631 ///
632 /// This method is **cancel-safe**. Dropping the future (e.g., via `select!` or `timeout`)
633 /// will not lose the announced value. The value remains available for subsequent calls
634 /// to `listen()` or `try_listen()`.
635 ///
636 /// You can safely use this method with:
637 /// - `tokio::select!` - Choose between multiple futures
638 /// - `tokio::time::timeout` - Add a timeout
639 /// - Any other cancellation mechanism
640 ///
641 /// # Multiple Calls
642 ///
643 /// Calling `listen()` multiple times (even after it returns) will always return the
644 /// same value. The value is stored in an `Arc<OnceLock>` and cloned for each listener.
645 ///
646 /// # Performance
647 ///
648 /// - Fast path (already announced): ~5-10ns
649 /// - Slow path (waiting): ~100ns + wait time
650 ///
651 /// # Examples
652 ///
653 /// ```
654 /// use announcement::Announcement;
655 ///
656 /// # #[tokio::main]
657 /// # async fn main() {
658 /// let (announcer, announcement) = Announcement::new();
659 /// let listener = announcement.listener();
660 ///
661 /// tokio::spawn(async move {
662 /// announcer.announce(42).unwrap();
663 /// });
664 ///
665 /// let value = listener.listen().await.unwrap();
666 /// assert_eq!(value, 42);
667 /// # }
668 /// ```
669 pub async fn listen(&self) -> Result<T, ListenError> {
670 loop {
671 // Fast path: value already announced
672 if let Some(val) = self.inner.value.get() {
673 #[cfg(feature = "tracing")]
674 tracing::trace!("Value already announced (fast path)");
675 return Ok(val.clone());
676 }
677
678 // Check if closed
679 if self.inner.closed.load(Ordering::SeqCst) {
680 #[cfg(feature = "tracing")]
681 tracing::trace!("Announcer was closed without announcing");
682 return Err(ListenError::Closed);
683 }
684
685 // Slow path: set up listener before waiting
686 #[cfg(feature = "tracing")]
687 tracing::trace!("Waiting for value to be announced");
688
689 let event_listener = self.event.listen();
690
691 // Check again (TOCTOU protection)
692 // This prevents the race where the value is set after we check
693 // but before we register the listener
694 if let Some(val) = self.inner.value.get() {
695 #[cfg(feature = "tracing")]
696 tracing::trace!("Value announced during listener setup");
697 return Ok(val.clone());
698 }
699
700 // Check closed again
701 if self.inner.closed.load(Ordering::SeqCst) {
702 #[cfg(feature = "tracing")]
703 tracing::trace!("Announcer closed during listener setup");
704 return Err(ListenError::Closed);
705 }
706
707 // Wait for notification
708 event_listener.await;
709
710 #[cfg(feature = "tracing")]
711 tracing::trace!("Received notification, checking for value or closure");
712 }
713 }
714}
715
716#[cfg(feature = "std")]
717impl<T: Clone> Listener<T> {
718 /// Blocking wait for the announced value.
719 ///
720 /// This method blocks the current thread until a value is announced or the
721 /// announcer is closed. FOR NON-ASYNC CONTEXTS ONLY.
722 ///
723 /// **Requires**: `std` feature (enabled by default)
724 ///
725 /// # Returns
726 ///
727 /// - `Ok(value)` - A value was announced
728 /// - `Err(ListenError::Closed)` - The announcer was dropped or closed without announcing
729 ///
730 /// # ⚠️ Warning: Async Context
731 ///
732 /// **DO NOT** use this method in async functions or tasks. It will block the executor
733 /// thread, preventing other tasks from running. This can cause:
734 /// - Deadlocks in async runtimes
735 /// - Poor performance degradation
736 /// - Runtime warnings or panics
737 ///
738 /// In async contexts, use [`listen()`](Self::listen) instead.
739 ///
740 /// # When to Use
741 ///
742 /// Use this method when:
743 /// - You're in a non-async context (regular threads, main function)
744 /// - You want to synchronize between threads using standard library threads
745 /// - You're interfacing with blocking APIs
746 ///
747 /// # Thread Safety
748 ///
749 /// This method is thread-safe and can be called from any thread. Multiple threads
750 /// can block on different listeners simultaneously.
751 ///
752 /// # Examples
753 ///
754 /// ```
755 /// use announcement::Announcement;
756 /// use std::thread;
757 /// use std::time::Duration;
758 ///
759 /// let (announcer, announcement) = Announcement::new();
760 /// let listener = announcement.listener();
761 ///
762 /// thread::spawn(move || {
763 /// thread::sleep(Duration::from_millis(10));
764 /// announcer.announce(42).unwrap();
765 /// });
766 ///
767 /// let value = listener.listen_blocking().unwrap();
768 /// assert_eq!(value, 42);
769 /// ```
770 pub fn listen_blocking(&self) -> Result<T, ListenError> {
771 loop {
772 if let Some(val) = self.inner.value.get() {
773 return Ok(val.clone());
774 }
775
776 if self.inner.closed.load(Ordering::SeqCst) {
777 return Err(ListenError::Closed);
778 }
779
780 let listener = self.event.listen();
781
782 if let Some(val) = self.inner.value.get() {
783 return Ok(val.clone());
784 }
785
786 if self.inner.closed.load(Ordering::SeqCst) {
787 return Err(ListenError::Closed);
788 }
789
790 // Block current thread
791 EventListenerTrait::wait(listener);
792 }
793 }
794
795 /// Blocking wait with timeout.
796 ///
797 /// Returns the value if it's announced within the timeout period.
798 ///
799 /// **Requires**: `std` feature (enabled by default)
800 ///
801 /// # Returns
802 ///
803 /// - `Ok(val)` - Value received within timeout
804 /// - `Err(ListenError::Timeout)` - Timeout elapsed without receiving a value
805 /// - `Err(ListenError::Closed)` - The announcer was dropped or closed without announcing
806 ///
807 /// # ⚠️ Warning: Async Context
808 ///
809 /// **DO NOT** use this method in async functions or tasks. It will block the executor
810 /// thread. In async contexts, use [`listen()`](Self::listen) with `tokio::time::timeout`
811 /// or similar timeout mechanisms instead.
812 ///
813 /// # Examples
814 ///
815 /// ```
816 /// use announcement::Announcement;
817 /// use std::time::Duration;
818 ///
819 /// let (announcer, announcement) = Announcement::new();
820 /// let listener = announcement.listener();
821 ///
822 /// announcer.announce(42).unwrap();
823 ///
824 /// let result = listener.listen_timeout_blocking(Duration::from_secs(1));
825 /// assert_eq!(result, Ok(42));
826 /// ```
827 pub fn listen_timeout_blocking(&self, duration: Duration) -> Result<T, ListenError> {
828 let deadline = Instant::now() + duration;
829
830 loop {
831 if let Some(val) = self.inner.value.get() {
832 return Ok(val.clone());
833 }
834
835 if self.inner.closed.load(Ordering::SeqCst) {
836 return Err(ListenError::Closed);
837 }
838
839 let listener = self.event.listen();
840
841 if let Some(val) = self.inner.value.get() {
842 return Ok(val.clone());
843 }
844
845 if self.inner.closed.load(Ordering::SeqCst) {
846 return Err(ListenError::Closed);
847 }
848
849 let remaining = deadline.saturating_duration_since(Instant::now());
850 if remaining.is_zero() {
851 return Err(ListenError::Timeout);
852 }
853
854 if EventListenerTrait::wait_timeout(listener, remaining).is_none() {
855 return Err(ListenError::Timeout);
856 }
857 }
858 }
859}
860
861// Send and Sync are automatically derived for our types because:
862// - Arc<OnceLock<T>> is Send + Sync when T: Send + Sync
863// - Arc<Event> is Send + Sync (Event guarantees this)
864// No unsafe code needed!
865
866#[cfg(test)]
867mod tests {
868 use super::*;
869
870 /// Test: Unit test: is_announced() method
871 ///
872 /// Verifies: is_announced() correctly reports announcement state
873 /// Category: unit
874 #[test]
875 fn test_is_announced() {
876 let (announcer, announcement) = Announcement::<i32>::new();
877 assert!(!announcement.is_announced());
878 announcer.announce(42).unwrap();
879 assert!(announcement.is_announced());
880 }
881
882 /// Test: Unit test: try_listen before announce
883 ///
884 /// Verifies: try_listen() returns None before announce and Some(value) after
885 /// Category: unit
886 #[test]
887 fn test_try_listen_before_announce() {
888 let (announcer, announcement) = Announcement::<i32>::new();
889 let listener = announcement.listener();
890 assert_eq!(listener.try_listen(), None);
891 announcer.announce(42).unwrap();
892 assert_eq!(listener.try_listen(), Some(42));
893 }
894
895 /// Test: Unit test: try_listen after announce
896 ///
897 /// Verifies: try_listen() returns Some(value) when called after announce
898 /// Category: unit
899 #[test]
900 fn test_try_listen_after_announce() {
901 let (announcer, announcement) = Announcement::<i32>::new();
902 announcer.announce(42).unwrap();
903 let listener = announcement.listener();
904 assert_eq!(listener.try_listen(), Some(42));
905 }
906
907 /// Test: Unit test: listener cloning
908 ///
909 /// Verifies: Cloned listeners both receive the announced value
910 /// Category: unit
911 #[test]
912 fn test_clone_listener() {
913 let (announcer, announcement) = Announcement::<i32>::new();
914 let listener1 = announcement.listener();
915 let listener2 = listener1.clone();
916
917 announcer.announce(42).unwrap();
918
919 assert_eq!(listener1.try_listen(), Some(42));
920 assert_eq!(listener2.try_listen(), Some(42));
921 }
922
923 /// Test: Unit test: multiple listeners
924 ///
925 /// Verifies: Multiple listeners all receive the same announced value
926 /// Category: unit
927 #[test]
928 fn test_multiple_listeners() {
929 let (announcer, announcement) = Announcement::<i32>::new();
930 let l1 = announcement.listener();
931 let l2 = announcement.listener();
932 let l3 = announcement.listener();
933
934 announcer.announce(42).unwrap();
935
936 assert_eq!(l1.try_listen(), Some(42));
937 assert_eq!(l2.try_listen(), Some(42));
938 assert_eq!(l3.try_listen(), Some(42));
939 }
940
941 /// Test: Unit test: announce with no listeners
942 ///
943 /// Verifies: Announcing succeeds even when no listeners exist
944 /// Category: unit
945 #[test]
946 fn test_announce_with_no_listeners() {
947 let (announcer, _announcement) = Announcement::<i32>::new();
948 // No listeners created
949 announcer.announce(42).unwrap(); // Should succeed
950 }
951
952 /// Test: Unit test: AnnounceError::into_inner()
953 ///
954 /// Verifies: into_inner() extracts the value from AnnounceError
955 /// Category: unit
956 #[test]
957 fn test_error_into_inner() {
958 let err = AnnounceError::AlreadyAnnounced(99);
959 assert_eq!(err.into_inner(), 99);
960 }
961
962 /// Test: Unit test: announcement cloning
963 ///
964 /// Verifies: Cloned announcements share the same value
965 /// Category: unit
966 #[test]
967 fn test_clone_announcement() {
968 let (announcer, announcement) = Announcement::<i32>::new();
969 let announcement2 = announcement.clone();
970
971 let l1 = announcement.listener();
972 let l2 = announcement2.listener();
973
974 announcer.announce(42).unwrap();
975
976 assert_eq!(l1.try_listen(), Some(42));
977 assert_eq!(l2.try_listen(), Some(42));
978 }
979
980 /// Test: Unit test: Arc value handling
981 ///
982 /// Verifies: Arc-wrapped values work correctly with announcement
983 /// Category: unit
984 #[test]
985 fn test_with_arc() {
986 use std::sync::Arc;
987
988 #[derive(Clone, Debug, PartialEq)]
989 struct Data {
990 value: Vec<u8>,
991 }
992
993 let (announcer, announcement) = Announcement::new();
994 let data = Arc::new(Data {
995 value: vec![1, 2, 3],
996 });
997
998 let l1 = announcement.listener();
999 let l2 = announcement.listener();
1000
1001 announcer.announce(data.clone()).unwrap();
1002
1003 let r1 = l1.try_listen().unwrap();
1004 let r2 = l2.try_listen().unwrap();
1005
1006 assert_eq!(r1, data);
1007 assert_eq!(r2, data);
1008
1009 // Verify Arc refcount is 4 (original + announcement + r1 + r2)
1010 assert_eq!(Arc::strong_count(&r1), 4);
1011 }
1012
1013 /// Test: Unit test: AnnounceError Display trait
1014 ///
1015 /// Verifies: AnnounceError implements Display correctly
1016 /// Category: unit
1017 #[test]
1018 fn test_error_display() {
1019 let err = AnnounceError::AlreadyAnnounced(42);
1020 assert_eq!(err.to_string(), "a value has already been announced");
1021 }
1022
1023 /// Test: Unit test: AnnounceError Display trait
1024 ///
1025 /// Verifies: AnnounceError implements Display correctly
1026 /// Category: unit
1027 #[test]
1028 fn test_listen_error_display() {
1029 let err = ListenError::Timeout;
1030 assert_eq!(
1031 err.to_string(),
1032 "timeout elapsed while waiting for announcement"
1033 );
1034 }
1035}