Skip to main content

laminar_core/streaming/
broadcast.rs

1//! Broadcast Channel for multi-consumer streaming.
2//!
3//! [`BroadcastChannel<T>`] implements a shared ring buffer with per-subscriber cursors
4//! for single-producer, multiple-consumer (SPMC) broadcast. Designed for Ring 0 hot path:
5//! zero allocations after construction, lock-free reads and writes.
6//!
7//! # Design
8//!
9//! - Pre-allocated ring buffer with power-of-2 capacity and bitmask indexing
10//! - **Lock-free hot path**: `broadcast()`, `read()`, `slowest_cursor()` use only atomics
11//! - Fixed-size cursor array with O(1) direct indexing by subscriber ID
12//! - Cache-padded cursor slots (64-byte aligned) to prevent false sharing
13//! - Single producer via [`broadcast()`](BroadcastChannel::broadcast)
14//! - Dynamic subscribers via [`subscribe()`](BroadcastChannel::subscribe) and
15//!   [`unsubscribe()`](BroadcastChannel::unsubscribe)
16//! - Configurable slow subscriber policies: `Block`, `DropSlow`, `SkipForSlow`
17//! - Per-subscriber lag tracking for backpressure monitoring
18//!
19//! # Key Principle
20//!
21//! **Broadcast is derived from query plan analysis, not user configuration.**
22//! The planner determines when multiple MVs read from the same source and
23//! auto-upgrades to broadcast mode.
24//!
25//! # Safety
26//!
27//! The single-writer invariant is upheld by the DAG executor, which ensures
28//! exactly one thread calls `broadcast()` on any given channel. Multiple threads
29//! may call `read()` with distinct subscriber IDs.
30//!
31//! # Performance Targets
32//!
33//! | Operation | Target |
34//! |-----------|--------|
35//! | `broadcast()` | < 100ns (2 subscribers) |
36//! | `read()` | < 50ns |
37//! | `subscribe()` | O(1), CAS on slot (Ring 2 only) |
38
39use std::cell::UnsafeCell;
40use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
41use std::sync::RwLock;
42use std::time::Duration;
43
44use crate::tpc::CachePadded;
45
46/// Default buffer capacity (power of 2).
47pub const DEFAULT_BROADCAST_CAPACITY: usize = 1024;
48
49/// Default maximum subscribers.
50pub const DEFAULT_MAX_SUBSCRIBERS: usize = 64;
51
52/// Default slow subscriber timeout.
53pub const DEFAULT_SLOW_SUBSCRIBER_TIMEOUT: Duration = Duration::from_millis(100);
54
55/// Default lag warning threshold.
56pub const DEFAULT_LAG_WARNING_THRESHOLD: u64 = 1000;
57
58/// Slow subscriber handling policy.
59///
60/// Determines what happens when the slowest subscriber is too far behind
61/// the producer (about to overwrite unread data).
62#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
63pub enum SlowSubscriberPolicy {
64    /// Block producer until slow subscriber catches up (default).
65    ///
66    /// Best for exactly-once semantics where no data loss is acceptable.
67    /// May cause head-of-line blocking if one subscriber is permanently slow.
68    #[default]
69    Block,
70
71    /// Drop the slowest subscriber and continue.
72    ///
73    /// Best for systems where continuing is more important than any single
74    /// subscriber. The dropped subscriber receives a disconnection signal.
75    DropSlow,
76
77    /// Skip messages for slow subscribers (they lose data).
78    ///
79    /// Best for real-time systems where freshness matters more than
80    /// completeness. Slow subscribers simply miss events.
81    SkipForSlow,
82}
83
84/// Broadcast channel configuration.
85#[derive(Debug, Clone)]
86pub struct BroadcastConfig {
87    /// Buffer capacity (will be rounded to power of 2).
88    pub capacity: usize,
89
90    /// Maximum allowed subscribers.
91    pub max_subscribers: usize,
92
93    /// Policy when slowest subscriber is too far behind.
94    pub slow_subscriber_policy: SlowSubscriberPolicy,
95
96    /// Timeout for blocking on slow subscriber (Block policy).
97    pub slow_subscriber_timeout: Duration,
98
99    /// Lag threshold for warnings.
100    pub lag_warning_threshold: u64,
101}
102
103impl Default for BroadcastConfig {
104    fn default() -> Self {
105        Self {
106            capacity: DEFAULT_BROADCAST_CAPACITY,
107            max_subscribers: DEFAULT_MAX_SUBSCRIBERS,
108            slow_subscriber_policy: SlowSubscriberPolicy::Block,
109            slow_subscriber_timeout: DEFAULT_SLOW_SUBSCRIBER_TIMEOUT,
110            lag_warning_threshold: DEFAULT_LAG_WARNING_THRESHOLD,
111        }
112    }
113}
114
115impl BroadcastConfig {
116    /// Creates a new configuration with the specified capacity.
117    #[must_use]
118    pub fn with_capacity(capacity: usize) -> Self {
119        Self {
120            capacity,
121            ..Default::default()
122        }
123    }
124
125    /// Creates a builder for custom configuration.
126    #[must_use]
127    pub fn builder() -> BroadcastConfigBuilder {
128        BroadcastConfigBuilder::default()
129    }
130
131    /// Returns the effective capacity (rounded to power of 2).
132    #[must_use]
133    pub fn effective_capacity(&self) -> usize {
134        self.capacity.max(4).next_power_of_two()
135    }
136}
137
138/// Builder for [`BroadcastConfig`].
139#[derive(Debug, Default)]
140pub struct BroadcastConfigBuilder {
141    capacity: Option<usize>,
142    max_subscribers: Option<usize>,
143    slow_subscriber_policy: Option<SlowSubscriberPolicy>,
144    slow_subscriber_timeout: Option<Duration>,
145    lag_warning_threshold: Option<u64>,
146}
147
148impl BroadcastConfigBuilder {
149    /// Sets the buffer capacity.
150    #[must_use]
151    pub fn capacity(mut self, capacity: usize) -> Self {
152        self.capacity = Some(capacity);
153        self
154    }
155
156    /// Sets the maximum number of subscribers.
157    #[must_use]
158    pub fn max_subscribers(mut self, max: usize) -> Self {
159        self.max_subscribers = Some(max);
160        self
161    }
162
163    /// Sets the slow subscriber policy.
164    #[must_use]
165    pub fn slow_subscriber_policy(mut self, policy: SlowSubscriberPolicy) -> Self {
166        self.slow_subscriber_policy = Some(policy);
167        self
168    }
169
170    /// Sets the slow subscriber timeout (for Block policy).
171    #[must_use]
172    pub fn slow_subscriber_timeout(mut self, timeout: Duration) -> Self {
173        self.slow_subscriber_timeout = Some(timeout);
174        self
175    }
176
177    /// Sets the lag warning threshold.
178    #[must_use]
179    pub fn lag_warning_threshold(mut self, threshold: u64) -> Self {
180        self.lag_warning_threshold = Some(threshold);
181        self
182    }
183
184    /// Builds the configuration.
185    #[must_use]
186    pub fn build(self) -> BroadcastConfig {
187        BroadcastConfig {
188            capacity: self.capacity.unwrap_or(DEFAULT_BROADCAST_CAPACITY),
189            max_subscribers: self.max_subscribers.unwrap_or(DEFAULT_MAX_SUBSCRIBERS),
190            slow_subscriber_policy: self.slow_subscriber_policy.unwrap_or_default(),
191            slow_subscriber_timeout: self
192                .slow_subscriber_timeout
193                .unwrap_or(DEFAULT_SLOW_SUBSCRIBER_TIMEOUT),
194            lag_warning_threshold: self
195                .lag_warning_threshold
196                .unwrap_or(DEFAULT_LAG_WARNING_THRESHOLD),
197        }
198    }
199}
200
201/// Broadcast channel errors.
202#[derive(Debug, thiserror::Error)]
203pub enum BroadcastError {
204    /// Maximum subscribers reached.
205    #[error("maximum subscribers ({0}) reached")]
206    MaxSubscribersReached(usize),
207
208    /// Slow subscriber timeout.
209    #[error("slow subscriber timeout after {0:?}")]
210    SlowSubscriberTimeout(Duration),
211
212    /// No active subscribers.
213    #[error("no active subscribers")]
214    NoSubscribers,
215
216    /// Subscriber not found.
217    #[error("subscriber {0} not found")]
218    SubscriberNotFound(usize),
219
220    /// Buffer full (used internally).
221    #[error("buffer full")]
222    BufferFull,
223
224    /// Channel closed.
225    #[error("channel closed")]
226    Closed,
227}
228
229/// Cache-padded cursor slot for lock-free hot path access.
230///
231/// Each slot is 64-byte aligned to prevent false sharing between cores.
232/// Hot path methods (`broadcast`, `read`, `slowest_cursor`) only touch
233/// the atomic fields, achieving lock-free operation.
234///
235/// # Memory Layout
236///
237/// ```text
238/// Offset  Field       Size
239/// 0       active      1 byte (AtomicBool)
240/// 1-7     (padding)   7 bytes
241/// 8       read_seq    8 bytes (AtomicU64)
242/// 16-63   _pad        48 bytes
243/// Total: 64 bytes (one cache line)
244/// ```
245#[repr(C, align(64))]
246struct CursorSlot {
247    /// Whether this slot has an active subscriber.
248    active: AtomicBool,
249    /// Read position (monotonically increasing).
250    read_seq: AtomicU64,
251    /// Padding to fill cache line (prevents false sharing).
252    _pad: [u8; 48],
253}
254
255impl CursorSlot {
256    /// Creates an empty (inactive) cursor slot.
257    const fn empty() -> Self {
258        Self {
259            active: AtomicBool::new(false),
260            read_seq: AtomicU64::new(0),
261            _pad: [0; 48],
262        }
263    }
264
265    /// Tries to claim this slot atomically.
266    ///
267    /// Returns `true` if successfully claimed, `false` if already active.
268    #[inline]
269    fn try_claim(&self, start_seq: u64) -> bool {
270        // CAS: only claim if currently inactive
271        if self
272            .active
273            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
274            .is_ok()
275        {
276            self.read_seq.store(start_seq, Ordering::Release);
277            true
278        } else {
279            false
280        }
281    }
282
283    #[inline]
284    fn is_active(&self) -> bool {
285        self.active.load(Ordering::Acquire)
286    }
287
288    #[inline]
289    fn deactivate(&self) {
290        self.active.store(false, Ordering::Release);
291    }
292
293    #[inline]
294    fn read_position(&self) -> u64 {
295        self.read_seq.load(Ordering::Acquire)
296    }
297}
298
299impl Default for CursorSlot {
300    fn default() -> Self {
301        Self::empty()
302    }
303}
304
305/// Broadcast channel for multi-consumer scenarios.
306///
307/// Uses a shared ring buffer with per-subscriber cursors for memory efficiency.
308/// Slowest consumer determines retention. Supports dynamic subscribe/unsubscribe.
309///
310/// # Lock-Free Hot Path
311///
312/// The hot path methods (`broadcast`, `read`, `slowest_cursor`) are completely
313/// lock-free, using only atomic operations on the pre-allocated cursor slots.
314/// This achieves consistent sub-100ns latency without the 5-50μs spikes that
315/// `RwLock` can cause.
316///
317/// # Type Parameters
318///
319/// * `T` - The event type. Must be `Clone` for subscribers (typically
320///   `Arc<RecordBatch>` where clone is an O(1) atomic increment).
321///
322/// # Performance Targets
323///
324/// | Operation | Target |
325/// |-----------|--------|
326/// | `broadcast()` (2 subs) | < 100ns |
327/// | `broadcast()` (4 subs) | < 150ns |
328/// | `read()` | < 50ns |
329///
330/// # Example
331///
332/// ```rust,ignore
333/// use laminar_core::streaming::broadcast::{BroadcastChannel, BroadcastConfig};
334///
335/// let channel = BroadcastChannel::<i32>::new(BroadcastConfig::default());
336///
337/// // Subscribe
338/// let sub1 = channel.subscribe("mv1").unwrap();
339/// let sub2 = channel.subscribe("mv2").unwrap();
340///
341/// // Broadcast
342/// channel.broadcast(42).unwrap();
343///
344/// // Each subscriber receives the value
345/// assert_eq!(channel.read(sub1), Some(42));
346/// assert_eq!(channel.read(sub2), Some(42));
347/// ```
348pub struct BroadcastChannel<T> {
349    /// Shared ring buffer (pre-allocated slots).
350    buffer: Box<[UnsafeCell<Option<T>>]>,
351
352    /// Write sequence (single producer, monotonically increasing).
353    /// Cache-padded to prevent false sharing with read cursors.
354    write_seq: CachePadded<AtomicU64>,
355
356    /// Pre-allocated cursor slots (lock-free hot path access).
357    /// Indexed directly by `subscriber_id` for O(1) lookup.
358    /// Each slot is 64-byte cache-line aligned.
359    cursor_slots: Box<[CursorSlot]>,
360
361    /// Subscriber names (only accessed during setup/debug, not hot path).
362    /// Indexed by `subscriber_id`. Protected by `RwLock` since names are
363    /// only read during `subscribe()`, `subscriber_info()`, `list_subscribers()`.
364    cursor_names: RwLock<Vec<String>>,
365
366    /// Number of active subscribers (atomically maintained).
367    /// Used for quick `subscriber_count()` without scanning.
368    active_count: AtomicUsize,
369
370    /// Hint for next slot to try during `subscribe()`.
371    /// Not critical for correctness - just optimization.
372    next_slot_hint: AtomicUsize,
373
374    /// Configuration.
375    config: BroadcastConfig,
376
377    /// Capacity (power of 2).
378    capacity: usize,
379
380    /// Bitmask for modular indexing.
381    mask: usize,
382
383    /// Whether the channel is closed.
384    closed: AtomicBool,
385}
386
387// SAFETY: BroadcastChannel is designed for SPMC (single-producer, multi-consumer):
388// - Single writer thread calls broadcast() (enforced by DAG executor)
389// - Multiple consumer threads call read() with distinct subscriber IDs
390// - All shared state uses atomic operations with appropriate memory ordering
391// - UnsafeCell access is guarded by write_seq/cursor synchronization
392unsafe impl<T: Send> Send for BroadcastChannel<T> {}
393// SAFETY: See above. Consumers access distinct cursor entries.
394// Slot reads are protected by the write_seq protocol.
395unsafe impl<T: Send> Sync for BroadcastChannel<T> {}
396
397impl<T> BroadcastChannel<T> {
398    /// Creates a new broadcast channel with the given configuration.
399    ///
400    /// Pre-allocates all cursor slots for lock-free hot path operation.
401    ///
402    /// # Arguments
403    ///
404    /// * `config` - Channel configuration
405    ///
406    /// # Example
407    ///
408    /// ```rust,ignore
409    /// let channel = BroadcastChannel::<i32>::new(BroadcastConfig::with_capacity(256));
410    /// ```
411    #[must_use]
412    pub fn new(config: BroadcastConfig) -> Self {
413        let capacity = config.effective_capacity();
414        let mask = capacity - 1;
415        let max_subscribers = config.max_subscribers;
416
417        let buffer: Vec<UnsafeCell<Option<T>>> =
418            (0..capacity).map(|_| UnsafeCell::new(None)).collect();
419
420        // Pre-allocate cursor slots (64-byte aligned each)
421        let cursor_slots: Vec<CursorSlot> =
422            (0..max_subscribers).map(|_| CursorSlot::empty()).collect();
423
424        // Pre-allocate name storage
425        let cursor_names: Vec<String> = (0..max_subscribers).map(|_| String::new()).collect();
426
427        Self {
428            buffer: buffer.into_boxed_slice(),
429            write_seq: CachePadded::new(AtomicU64::new(0)),
430            cursor_slots: cursor_slots.into_boxed_slice(),
431            cursor_names: RwLock::new(cursor_names),
432            active_count: AtomicUsize::new(0),
433            next_slot_hint: AtomicUsize::new(0),
434            config,
435            capacity,
436            mask,
437            closed: AtomicBool::new(false),
438        }
439    }
440
441    /// Returns the slowest cursor position among active subscribers.
442    ///
443    /// **Lock-free**: Only uses atomic loads, no locks.
444    ///
445    /// Returns `u64::MAX` if there are no active subscribers.
446    #[must_use]
447    pub fn slowest_cursor(&self) -> u64 {
448        let mut min_pos = u64::MAX;
449        for slot in &*self.cursor_slots {
450            if slot.is_active() {
451                let pos = slot.read_position();
452                if pos < min_pos {
453                    min_pos = pos;
454                }
455            }
456        }
457        min_pos
458    }
459
460    /// Returns the lag (unread messages) for a subscriber.
461    ///
462    /// **Lock-free**: Only uses atomic loads, no locks.
463    ///
464    /// Returns 0 if the subscriber ID is out of bounds or inactive.
465    #[must_use]
466    pub fn subscriber_lag(&self, subscriber_id: usize) -> u64 {
467        if subscriber_id >= self.cursor_slots.len() {
468            return 0;
469        }
470        let slot = &self.cursor_slots[subscriber_id];
471        if !slot.is_active() {
472            return 0;
473        }
474        let write_pos = self.write_seq.load(Ordering::Acquire);
475        let read_pos = slot.read_position();
476        write_pos.saturating_sub(read_pos)
477    }
478
479    /// Returns the number of active subscribers.
480    ///
481    /// **Lock-free**: Returns the atomically-maintained count.
482    #[must_use]
483    pub fn subscriber_count(&self) -> usize {
484        self.active_count.load(Ordering::Acquire)
485    }
486
487    /// Returns true if the subscriber is lagging beyond the warning threshold.
488    ///
489    /// **Lock-free**: Only uses atomic loads.
490    #[must_use]
491    pub fn is_lagging(&self, subscriber_id: usize) -> bool {
492        self.subscriber_lag(subscriber_id) >= self.config.lag_warning_threshold
493    }
494
495    /// Returns the current write position.
496    #[must_use]
497    pub fn write_position(&self) -> u64 {
498        self.write_seq.load(Ordering::Relaxed)
499    }
500
501    /// Returns the buffer capacity.
502    #[must_use]
503    pub fn capacity(&self) -> usize {
504        self.capacity
505    }
506
507    /// Returns the configuration.
508    #[must_use]
509    pub fn config(&self) -> &BroadcastConfig {
510        &self.config
511    }
512
513    /// Returns true if the channel is closed.
514    #[must_use]
515    pub fn is_closed(&self) -> bool {
516        self.closed.load(Ordering::Acquire)
517    }
518
519    /// Closes the channel.
520    ///
521    /// After closing, `broadcast()` returns `Err(Closed)` and subscribers
522    /// can only read remaining buffered data.
523    pub fn close(&self) {
524        self.closed.store(true, Ordering::Release);
525    }
526
527    /// Returns subscriber information for debugging.
528    ///
529    /// # Note
530    ///
531    /// Takes a read lock to access the name. Not intended for hot path use.
532    ///
533    /// # Panics
534    ///
535    /// Panics if the internal lock is poisoned (should not happen in normal use).
536    #[must_use]
537    pub fn subscriber_info(&self, subscriber_id: usize) -> Option<SubscriberInfo> {
538        if subscriber_id >= self.cursor_slots.len() {
539            return None;
540        }
541        let slot = &self.cursor_slots[subscriber_id];
542        let names = self.cursor_names.read().unwrap();
543        let write_pos = self.write_seq.load(Ordering::Acquire);
544        let read_pos = slot.read_position();
545        let active = slot.is_active();
546
547        // Return info even for inactive slots (for debugging)
548        Some(SubscriberInfo {
549            id: subscriber_id,
550            name: names[subscriber_id].clone(),
551            active,
552            read_position: read_pos,
553            lag: write_pos.saturating_sub(read_pos),
554        })
555    }
556
557    /// Lists all active subscribers.
558    ///
559    /// # Note
560    ///
561    /// Takes a read lock to access names. Not intended for hot path use.
562    ///
563    /// # Panics
564    ///
565    /// Panics if the internal lock is poisoned (should not happen in normal use).
566    #[must_use]
567    pub fn list_subscribers(&self) -> Vec<SubscriberInfo> {
568        let names = self.cursor_names.read().unwrap();
569        let write_pos = self.write_seq.load(Ordering::Acquire);
570
571        self.cursor_slots
572            .iter()
573            .enumerate()
574            .filter(|(_, slot)| slot.is_active())
575            .map(|(id, slot)| {
576                let read_pos = slot.read_position();
577                SubscriberInfo {
578                    id,
579                    name: names[id].clone(),
580                    active: true,
581                    read_position: read_pos,
582                    lag: write_pos.saturating_sub(read_pos),
583                }
584            })
585            .collect()
586    }
587
588    /// Calculates slot index from sequence number.
589    #[inline]
590    fn slot_index(&self, seq: u64) -> usize {
591        // Bitmask truncates to capacity range, so u64->usize narrowing is safe.
592        #[allow(clippy::cast_possible_truncation)]
593        let idx = (seq as usize) & self.mask;
594        idx
595    }
596
597    /// Unsubscribes a subscriber.
598    ///
599    /// **Lock-free**: Only uses atomic store on the slot.
600    ///
601    /// The subscriber's cursor is deactivated but the slot remains allocated
602    /// (can be reused by future subscribers). Subsequent reads with this ID
603    /// will return `None`.
604    pub fn unsubscribe(&self, subscriber_id: usize) {
605        if subscriber_id < self.cursor_slots.len() {
606            let slot = &self.cursor_slots[subscriber_id];
607            if slot.is_active() {
608                slot.deactivate();
609                self.active_count.fetch_sub(1, Ordering::Release);
610            }
611        }
612    }
613}
614
615impl<T: Clone> BroadcastChannel<T> {
616    /// Broadcasts a value to all subscribers.
617    ///
618    /// **Lock-free**: Only uses atomic operations, no locks on hot path.
619    ///
620    /// Writes the value into the next available slot. All active subscribers
621    /// will be able to read this value via [`read()`](Self::read).
622    ///
623    /// # Errors
624    ///
625    /// - [`BroadcastError::NoSubscribers`] if there are no active subscribers
626    /// - [`BroadcastError::SlowSubscriberTimeout`] if Block policy times out
627    /// - [`BroadcastError::Closed`] if the channel is closed
628    ///
629    /// # Safety Contract
630    ///
631    /// Must be called from a single writer thread only. The DAG executor
632    /// enforces this by assigning exactly one producer per broadcast channel.
633    pub fn broadcast(&self, value: T) -> Result<(), BroadcastError> {
634        if self.closed.load(Ordering::Acquire) {
635            return Err(BroadcastError::Closed);
636        }
637
638        let write_pos = self.write_seq.load(Ordering::Relaxed);
639        let slot_idx = self.slot_index(write_pos);
640
641        // Check if we need to wait for slow subscribers (lock-free)
642        let min_read = self.slowest_cursor();
643        if min_read == u64::MAX {
644            return Err(BroadcastError::NoSubscribers);
645        }
646
647        // Check if the target slot would overwrite unread data
648        if write_pos >= min_read + self.capacity as u64 {
649            self.handle_slow_subscriber(write_pos)?;
650        }
651
652        // SAFETY: Single writer guarantees exclusive write access to this slot.
653        // The slot is available because we've either waited for slow subscribers
654        // or applied the configured policy.
655        unsafe { *self.buffer[slot_idx].get() = Some(value) };
656
657        // Advance write position (Release makes new data visible to consumers).
658        self.write_seq.store(write_pos + 1, Ordering::Release);
659
660        Ok(())
661    }
662
663    /// Registers a new subscriber.
664    ///
665    /// Returns the subscriber ID which can be used with [`read()`](Self::read).
666    /// The subscriber ID is the slot index for O(1) direct access on the hot path.
667    ///
668    /// New subscribers start reading from the current write position (they don't
669    /// see historical data).
670    ///
671    /// # Errors
672    ///
673    /// Returns [`BroadcastError::MaxSubscribersReached`] if all slots are occupied.
674    ///
675    /// # Performance
676    ///
677    /// Uses CAS to claim a slot, then takes write lock for name storage.
678    /// Should only be called during setup, not on hot path.
679    ///
680    /// # Panics
681    ///
682    /// Panics if the internal lock is poisoned (should not happen in normal use).
683    pub fn subscribe(&self, name: impl Into<String>) -> Result<usize, BroadcastError> {
684        let start_seq = self.write_seq.load(Ordering::Acquire);
685        let max_slots = self.cursor_slots.len();
686
687        // Start from hint and scan for an available slot
688        let hint = self.next_slot_hint.load(Ordering::Relaxed) % max_slots;
689
690        // Try to claim a slot using CAS (lock-free)
691        for offset in 0..max_slots {
692            let slot_id = (hint + offset) % max_slots;
693            let slot = &self.cursor_slots[slot_id];
694
695            if slot.try_claim(start_seq) {
696                // Successfully claimed slot - now store the name
697                {
698                    let mut names = self.cursor_names.write().unwrap();
699                    names[slot_id] = name.into();
700                }
701
702                // Update hint for next subscribe
703                self.next_slot_hint
704                    .store((slot_id + 1) % max_slots, Ordering::Relaxed);
705
706                // Increment active count
707                self.active_count.fetch_add(1, Ordering::Release);
708
709                return Ok(slot_id);
710            }
711        }
712
713        // All slots occupied
714        Err(BroadcastError::MaxSubscribersReached(max_slots))
715    }
716
717    /// Reads the next value for a subscriber.
718    ///
719    /// **Lock-free**: Uses direct O(1) array indexing, no locks.
720    ///
721    /// Returns `Some(value)` if data is available, or `None` if the subscriber
722    /// is caught up with the producer, has been unsubscribed, or the ID is invalid.
723    ///
724    /// # Arguments
725    ///
726    /// * `subscriber_id` - The subscriber's ID from [`subscribe()`](Self::subscribe)
727    #[inline]
728    pub fn read(&self, subscriber_id: usize) -> Option<T> {
729        // O(1) direct indexing - no lock, no linear scan
730        if subscriber_id >= self.cursor_slots.len() {
731            return None;
732        }
733
734        let slot = &self.cursor_slots[subscriber_id];
735
736        if !slot.is_active() {
737            return None;
738        }
739
740        let read_pos = slot.read_seq.load(Ordering::Relaxed);
741        let write_pos = self.write_seq.load(Ordering::Acquire);
742
743        if read_pos >= write_pos {
744            return None; // No data available
745        }
746
747        let buffer_idx = self.slot_index(read_pos);
748
749        // SAFETY: write_pos > read_pos guarantees this slot contains valid data.
750        // The Acquire load of write_seq above synchronizes-with the Release store
751        // in broadcast(), ensuring the slot value is visible.
752        let value = unsafe { (*self.buffer[buffer_idx].get()).as_ref()?.clone() };
753
754        // Advance read position
755        slot.read_seq.store(read_pos + 1, Ordering::Release);
756
757        Some(value)
758    }
759
760    /// Tries to read without blocking.
761    ///
762    /// **Lock-free**: Inlined logic, no double-locking.
763    ///
764    /// Returns `Ok(Some(value))` if data is available, `Ok(None)` if caught up,
765    /// or `Err` if the subscriber is invalid or unsubscribed.
766    ///
767    /// # Errors
768    ///
769    /// Returns [`BroadcastError::SubscriberNotFound`] if the subscriber ID is invalid
770    /// or the subscriber has been unsubscribed.
771    #[inline]
772    pub fn try_read(&self, subscriber_id: usize) -> Result<Option<T>, BroadcastError> {
773        // O(1) direct indexing - no lock, no linear scan
774        if subscriber_id >= self.cursor_slots.len() {
775            return Err(BroadcastError::SubscriberNotFound(subscriber_id));
776        }
777
778        let slot = &self.cursor_slots[subscriber_id];
779
780        if !slot.is_active() {
781            return Err(BroadcastError::SubscriberNotFound(subscriber_id));
782        }
783
784        let read_pos = slot.read_seq.load(Ordering::Relaxed);
785        let write_pos = self.write_seq.load(Ordering::Acquire);
786
787        if read_pos >= write_pos {
788            return Ok(None); // Caught up
789        }
790
791        let buffer_idx = self.slot_index(read_pos);
792
793        // SAFETY: write_pos > read_pos guarantees this slot contains valid data.
794        let value = unsafe { (*self.buffer[buffer_idx].get()).clone() };
795
796        if value.is_some() {
797            // Advance read position
798            slot.read_seq.store(read_pos + 1, Ordering::Release);
799        }
800
801        Ok(value)
802    }
803
804    /// Handles slow subscriber based on policy.
805    fn handle_slow_subscriber(&self, target_write: u64) -> Result<(), BroadcastError> {
806        match self.config.slow_subscriber_policy {
807            SlowSubscriberPolicy::Block => self.wait_for_slowest(target_write),
808            SlowSubscriberPolicy::DropSlow => {
809                self.drop_slowest_subscriber();
810                Ok(())
811            }
812            SlowSubscriberPolicy::SkipForSlow => {
813                // Just overwrite - slow subscribers will skip ahead
814                Ok(())
815            }
816        }
817    }
818
819    /// Waits for the slowest subscriber to catch up (Block policy).
820    fn wait_for_slowest(&self, target_write: u64) -> Result<(), BroadcastError> {
821        let start = std::time::Instant::now();
822        let timeout = self.config.slow_subscriber_timeout;
823
824        loop {
825            let min_read = self.slowest_cursor();
826            if min_read == u64::MAX {
827                return Err(BroadcastError::NoSubscribers);
828            }
829
830            // Check if we have room
831            if target_write < min_read + self.capacity as u64 {
832                return Ok(());
833            }
834
835            // Check timeout
836            if start.elapsed() >= timeout {
837                return Err(BroadcastError::SlowSubscriberTimeout(timeout));
838            }
839
840            // Yield to allow slow subscriber to make progress
841            std::hint::spin_loop();
842        }
843    }
844
845    /// Drops the slowest subscriber (`DropSlow` policy).
846    ///
847    /// **Lock-free**: Scans slots atomically without locks.
848    fn drop_slowest_subscriber(&self) {
849        // Find the slowest active subscriber (lock-free scan)
850        let mut slowest_id: Option<usize> = None;
851        let mut slowest_pos = u64::MAX;
852
853        for (id, slot) in self.cursor_slots.iter().enumerate() {
854            if slot.is_active() {
855                let pos = slot.read_position();
856                if pos < slowest_pos {
857                    slowest_pos = pos;
858                    slowest_id = Some(id);
859                }
860            }
861        }
862
863        if let Some(id) = slowest_id {
864            self.cursor_slots[id].deactivate();
865            self.active_count.fetch_sub(1, Ordering::Release);
866        }
867    }
868}
869
870impl<T> std::fmt::Debug for BroadcastChannel<T> {
871    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
872        f.debug_struct("BroadcastChannel")
873            .field("capacity", &self.capacity)
874            .field("write_position", &self.write_position())
875            .field("subscriber_count", &self.subscriber_count())
876            .field("slowest_cursor", &self.slowest_cursor())
877            .field("closed", &self.is_closed())
878            .finish_non_exhaustive()
879    }
880}
881
882/// Information about a subscriber.
883#[derive(Debug, Clone)]
884pub struct SubscriberInfo {
885    /// Subscriber ID.
886    pub id: usize,
887    /// Subscriber name.
888    pub name: String,
889    /// Whether the subscriber is active.
890    pub active: bool,
891    /// Current read position.
892    pub read_position: u64,
893    /// Lag (unread messages).
894    pub lag: u64,
895}
896
897// ===========================================================================
898// Tests
899// ===========================================================================
900
901#[cfg(test)]
902#[allow(clippy::default_trait_access)]
903#[allow(clippy::unnecessary_map_or)]
904#[allow(clippy::cast_possible_truncation)]
905#[allow(clippy::cast_possible_wrap)]
906mod tests {
907    use super::*;
908    use std::sync::Arc;
909    use std::thread;
910
911    // --- Configuration tests ---
912
913    #[test]
914    fn test_default_config() {
915        let config = BroadcastConfig::default();
916        assert_eq!(config.capacity, DEFAULT_BROADCAST_CAPACITY);
917        assert_eq!(config.max_subscribers, DEFAULT_MAX_SUBSCRIBERS);
918        assert_eq!(config.slow_subscriber_policy, SlowSubscriberPolicy::Block);
919        assert_eq!(
920            config.slow_subscriber_timeout,
921            DEFAULT_SLOW_SUBSCRIBER_TIMEOUT
922        );
923        assert_eq!(config.lag_warning_threshold, DEFAULT_LAG_WARNING_THRESHOLD);
924    }
925
926    #[test]
927    fn test_config_with_capacity() {
928        let config = BroadcastConfig::with_capacity(256);
929        assert_eq!(config.capacity, 256);
930        assert_eq!(config.effective_capacity(), 256);
931    }
932
933    #[test]
934    fn test_config_effective_capacity_rounds_up() {
935        let config = BroadcastConfig::with_capacity(100);
936        assert_eq!(config.effective_capacity(), 128); // Next power of 2
937
938        let config = BroadcastConfig::with_capacity(1);
939        assert_eq!(config.effective_capacity(), 4); // Minimum is 4
940    }
941
942    #[test]
943    fn test_config_builder() {
944        let config = BroadcastConfig::builder()
945            .capacity(512)
946            .max_subscribers(8)
947            .slow_subscriber_policy(SlowSubscriberPolicy::DropSlow)
948            .slow_subscriber_timeout(Duration::from_secs(1))
949            .lag_warning_threshold(500)
950            .build();
951
952        assert_eq!(config.capacity, 512);
953        assert_eq!(config.max_subscribers, 8);
954        assert_eq!(
955            config.slow_subscriber_policy,
956            SlowSubscriberPolicy::DropSlow
957        );
958        assert_eq!(config.slow_subscriber_timeout, Duration::from_secs(1));
959        assert_eq!(config.lag_warning_threshold, 500);
960    }
961
962    #[test]
963    fn test_slow_subscriber_policy_default() {
964        let policy: SlowSubscriberPolicy = Default::default();
965        assert_eq!(policy, SlowSubscriberPolicy::Block);
966    }
967
968    #[test]
969    fn test_slow_subscriber_policy_variants() {
970        assert_eq!(SlowSubscriberPolicy::Block, SlowSubscriberPolicy::Block);
971        assert_ne!(SlowSubscriberPolicy::Block, SlowSubscriberPolicy::DropSlow);
972        assert_ne!(
973            SlowSubscriberPolicy::Block,
974            SlowSubscriberPolicy::SkipForSlow
975        );
976    }
977
978    // --- BroadcastChannel creation tests ---
979
980    #[test]
981    fn test_channel_creation() {
982        let channel = BroadcastChannel::<i32>::new(BroadcastConfig::default());
983        assert_eq!(channel.capacity(), DEFAULT_BROADCAST_CAPACITY);
984        assert_eq!(channel.subscriber_count(), 0);
985        assert_eq!(channel.write_position(), 0);
986        assert!(!channel.is_closed());
987    }
988
989    #[test]
990    fn test_channel_custom_capacity() {
991        let config = BroadcastConfig::with_capacity(64);
992        let channel = BroadcastChannel::<i32>::new(config);
993        assert_eq!(channel.capacity(), 64);
994    }
995
996    // --- Subscribe tests ---
997
998    #[test]
999    fn test_subscribe() {
1000        let channel = BroadcastChannel::<i32>::new(BroadcastConfig::default());
1001
1002        let id1 = channel.subscribe("sub1").unwrap();
1003        let id2 = channel.subscribe("sub2").unwrap();
1004
1005        assert_eq!(id1, 0);
1006        assert_eq!(id2, 1);
1007        assert_eq!(channel.subscriber_count(), 2);
1008    }
1009
1010    #[test]
1011    fn test_subscribe_max_limit() {
1012        let config = BroadcastConfig::builder().max_subscribers(2).build();
1013        let channel = BroadcastChannel::<i32>::new(config);
1014
1015        channel.subscribe("sub1").unwrap();
1016        channel.subscribe("sub2").unwrap();
1017
1018        let result = channel.subscribe("sub3");
1019        assert!(matches!(
1020            result,
1021            Err(BroadcastError::MaxSubscribersReached(2))
1022        ));
1023    }
1024
1025    #[test]
1026    fn test_unsubscribe() {
1027        let channel = BroadcastChannel::<i32>::new(BroadcastConfig::default());
1028
1029        let id = channel.subscribe("sub1").unwrap();
1030        assert_eq!(channel.subscriber_count(), 1);
1031
1032        channel.unsubscribe(id);
1033        assert_eq!(channel.subscriber_count(), 0);
1034    }
1035
1036    #[test]
1037    fn test_unsubscribe_nonexistent() {
1038        let channel = BroadcastChannel::<i32>::new(BroadcastConfig::default());
1039        channel.unsubscribe(999); // Should not panic
1040    }
1041
1042    // --- Broadcast and read tests ---
1043
1044    #[test]
1045    fn test_broadcast_no_subscribers() {
1046        let channel = BroadcastChannel::<i32>::new(BroadcastConfig::default());
1047
1048        let result = channel.broadcast(42);
1049        assert!(matches!(result, Err(BroadcastError::NoSubscribers)));
1050    }
1051
1052    #[test]
1053    fn test_broadcast_and_read_single_subscriber() {
1054        let channel = BroadcastChannel::<i32>::new(BroadcastConfig::default());
1055
1056        let id = channel.subscribe("sub1").unwrap();
1057
1058        channel.broadcast(1).unwrap();
1059        channel.broadcast(2).unwrap();
1060        channel.broadcast(3).unwrap();
1061
1062        assert_eq!(channel.read(id), Some(1));
1063        assert_eq!(channel.read(id), Some(2));
1064        assert_eq!(channel.read(id), Some(3));
1065        assert_eq!(channel.read(id), None); // Caught up
1066    }
1067
1068    #[test]
1069    fn test_broadcast_and_read_multiple_subscribers() {
1070        let channel = BroadcastChannel::<i32>::new(BroadcastConfig::default());
1071
1072        let id1 = channel.subscribe("sub1").unwrap();
1073        let id2 = channel.subscribe("sub2").unwrap();
1074
1075        channel.broadcast(42).unwrap();
1076
1077        // Both subscribers should receive the same value
1078        assert_eq!(channel.read(id1), Some(42));
1079        assert_eq!(channel.read(id2), Some(42));
1080    }
1081
1082    #[test]
1083    fn test_read_unsubscribed() {
1084        let channel = BroadcastChannel::<i32>::new(BroadcastConfig::default());
1085
1086        let id = channel.subscribe("sub1").unwrap();
1087        channel.broadcast(42).unwrap();
1088
1089        channel.unsubscribe(id);
1090
1091        assert_eq!(channel.read(id), None);
1092    }
1093
1094    #[test]
1095    fn test_read_nonexistent_subscriber() {
1096        let channel = BroadcastChannel::<i32>::new(BroadcastConfig::default());
1097        assert_eq!(channel.read(999), None);
1098    }
1099
1100    #[test]
1101    fn test_try_read() {
1102        let channel = BroadcastChannel::<i32>::new(BroadcastConfig::default());
1103
1104        let id = channel.subscribe("sub1").unwrap();
1105        channel.broadcast(42).unwrap();
1106
1107        assert_eq!(channel.try_read(id).unwrap(), Some(42));
1108        assert_eq!(channel.try_read(id).unwrap(), None); // Caught up
1109    }
1110
1111    #[test]
1112    fn test_try_read_nonexistent() {
1113        let channel = BroadcastChannel::<i32>::new(BroadcastConfig::default());
1114        let result = channel.try_read(999);
1115        assert!(matches!(
1116            result,
1117            Err(BroadcastError::SubscriberNotFound(999))
1118        ));
1119    }
1120
1121    // --- Lag tracking tests ---
1122
1123    #[test]
1124    fn test_subscriber_lag() {
1125        let channel = BroadcastChannel::<i32>::new(BroadcastConfig::default());
1126
1127        let id = channel.subscribe("sub1").unwrap();
1128
1129        assert_eq!(channel.subscriber_lag(id), 0);
1130
1131        channel.broadcast(1).unwrap();
1132        channel.broadcast(2).unwrap();
1133        channel.broadcast(3).unwrap();
1134
1135        assert_eq!(channel.subscriber_lag(id), 3);
1136
1137        channel.read(id);
1138        assert_eq!(channel.subscriber_lag(id), 2);
1139    }
1140
1141    #[test]
1142    fn test_slowest_cursor() {
1143        let channel = BroadcastChannel::<i32>::new(BroadcastConfig::default());
1144
1145        // No subscribers
1146        assert_eq!(channel.slowest_cursor(), u64::MAX);
1147
1148        let id1 = channel.subscribe("sub1").unwrap();
1149        let _id2 = channel.subscribe("sub2").unwrap();
1150
1151        channel.broadcast(1).unwrap();
1152        channel.broadcast(2).unwrap();
1153
1154        // Both at position 0
1155        assert_eq!(channel.slowest_cursor(), 0);
1156
1157        // sub1 reads one
1158        channel.read(id1);
1159        assert_eq!(channel.slowest_cursor(), 0); // sub2 is still at 0
1160    }
1161
1162    #[test]
1163    fn test_is_lagging() {
1164        let config = BroadcastConfig::builder()
1165            .capacity(256)
1166            .lag_warning_threshold(3)
1167            .build();
1168        let channel = BroadcastChannel::<i32>::new(config);
1169
1170        let id = channel.subscribe("sub1").unwrap();
1171
1172        assert!(!channel.is_lagging(id));
1173
1174        channel.broadcast(1).unwrap();
1175        channel.broadcast(2).unwrap();
1176        assert!(!channel.is_lagging(id));
1177
1178        channel.broadcast(3).unwrap();
1179        assert!(channel.is_lagging(id)); // Now lagging (3 >= threshold 3)
1180    }
1181
1182    // --- Slow subscriber policy tests ---
1183
1184    #[test]
1185    fn test_skip_for_slow_policy() {
1186        let config = BroadcastConfig::builder()
1187            .capacity(4)
1188            .slow_subscriber_policy(SlowSubscriberPolicy::SkipForSlow)
1189            .build();
1190        let channel = BroadcastChannel::<i32>::new(config);
1191
1192        let _id = channel.subscribe("slow").unwrap();
1193
1194        // Fill buffer beyond capacity - should not error
1195        for i in 0..10 {
1196            channel.broadcast(i).unwrap();
1197        }
1198
1199        // Slow subscriber missed some values
1200        assert_eq!(channel.write_position(), 10);
1201    }
1202
1203    #[test]
1204    fn test_drop_slow_policy() {
1205        let config = BroadcastConfig::builder()
1206            .capacity(4)
1207            .slow_subscriber_policy(SlowSubscriberPolicy::DropSlow)
1208            .build();
1209        let channel = BroadcastChannel::<i32>::new(config);
1210
1211        let id1 = channel.subscribe("slow").unwrap();
1212        let id2 = channel.subscribe("fast").unwrap();
1213
1214        // Fill buffer - slow subscriber should be dropped
1215        for i in 0..10 {
1216            // Fast subscriber reads immediately
1217            channel.read(id2);
1218            channel.broadcast(i).unwrap();
1219        }
1220
1221        // Slow subscriber was dropped
1222        assert!(channel.subscriber_info(id1).map_or(true, |i| !i.active));
1223    }
1224
1225    // --- Channel close tests ---
1226
1227    #[test]
1228    fn test_channel_close() {
1229        let channel = BroadcastChannel::<i32>::new(BroadcastConfig::default());
1230        let id = channel.subscribe("sub1").unwrap();
1231
1232        channel.broadcast(42).unwrap();
1233        channel.close();
1234
1235        assert!(channel.is_closed());
1236
1237        // Can still read buffered data
1238        assert_eq!(channel.read(id), Some(42));
1239
1240        // Cannot broadcast new data
1241        let result = channel.broadcast(43);
1242        assert!(matches!(result, Err(BroadcastError::Closed)));
1243    }
1244
1245    // --- Subscriber info tests ---
1246
1247    #[test]
1248    fn test_subscriber_info() {
1249        let channel = BroadcastChannel::<i32>::new(BroadcastConfig::default());
1250
1251        let id = channel.subscribe("test_sub").unwrap();
1252        channel.broadcast(1).unwrap();
1253        channel.broadcast(2).unwrap();
1254
1255        let info = channel.subscriber_info(id).unwrap();
1256        assert_eq!(info.id, id);
1257        assert_eq!(info.name, "test_sub");
1258        assert!(info.active);
1259        assert_eq!(info.read_position, 0);
1260        assert_eq!(info.lag, 2);
1261    }
1262
1263    #[test]
1264    fn test_list_subscribers() {
1265        let channel = BroadcastChannel::<i32>::new(BroadcastConfig::default());
1266
1267        channel.subscribe("sub1").unwrap();
1268        channel.subscribe("sub2").unwrap();
1269        let id3 = channel.subscribe("sub3").unwrap();
1270
1271        channel.unsubscribe(id3);
1272
1273        let subscribers = channel.list_subscribers();
1274        assert_eq!(subscribers.len(), 2);
1275        assert!(subscribers.iter().any(|s| s.name == "sub1"));
1276        assert!(subscribers.iter().any(|s| s.name == "sub2"));
1277    }
1278
1279    // --- Debug format test ---
1280
1281    #[test]
1282    fn test_debug_format() {
1283        let channel = BroadcastChannel::<i32>::new(BroadcastConfig::with_capacity(16));
1284        channel.subscribe("sub1").unwrap();
1285
1286        let debug = format!("{channel:?}");
1287        assert!(debug.contains("BroadcastChannel"));
1288        assert!(debug.contains("capacity"));
1289        assert!(debug.contains("subscriber_count"));
1290    }
1291
1292    // --- Error display tests ---
1293
1294    #[test]
1295    fn test_error_display() {
1296        let e1 = BroadcastError::MaxSubscribersReached(10);
1297        assert!(e1.to_string().contains("maximum subscribers (10)"));
1298
1299        let e2 = BroadcastError::SlowSubscriberTimeout(Duration::from_secs(5));
1300        assert!(e2.to_string().contains("slow subscriber timeout"));
1301
1302        let e3 = BroadcastError::NoSubscribers;
1303        assert!(e3.to_string().contains("no active subscribers"));
1304
1305        let e4 = BroadcastError::SubscriberNotFound(42);
1306        assert!(e4.to_string().contains("subscriber 42 not found"));
1307
1308        let e5 = BroadcastError::BufferFull;
1309        assert!(e5.to_string().contains("buffer full"));
1310
1311        let e6 = BroadcastError::Closed;
1312        assert!(e6.to_string().contains("channel closed"));
1313    }
1314
1315    // --- Concurrent tests ---
1316
1317    #[test]
1318    fn test_concurrent_subscribe_read() {
1319        let channel = Arc::new(BroadcastChannel::<i32>::new(BroadcastConfig::default()));
1320        let channel_clone = Arc::clone(&channel);
1321
1322        // Subscribe in main thread
1323        let id = channel.subscribe("main").unwrap();
1324
1325        // Broadcast in another thread
1326        let producer = thread::spawn(move || {
1327            for i in 0..100 {
1328                channel_clone.broadcast(i).unwrap();
1329            }
1330        });
1331
1332        // Read in main thread
1333        let mut received = Vec::new();
1334        loop {
1335            if let Some(val) = channel.read(id) {
1336                received.push(val);
1337                if received.len() == 100 {
1338                    break;
1339                }
1340            }
1341            thread::yield_now();
1342        }
1343
1344        producer.join().unwrap();
1345
1346        assert_eq!(received.len(), 100);
1347        for (i, val) in received.iter().enumerate() {
1348            assert_eq!(*val, i as i32);
1349        }
1350    }
1351
1352    #[test]
1353    fn test_multiple_concurrent_readers() {
1354        let channel = Arc::new(BroadcastChannel::<i32>::new(BroadcastConfig::default()));
1355
1356        let id1 = channel.subscribe("reader1").unwrap();
1357        let id2 = channel.subscribe("reader2").unwrap();
1358
1359        let channel1 = Arc::clone(&channel);
1360        let channel2 = Arc::clone(&channel);
1361        let channel_prod = Arc::clone(&channel);
1362
1363        // Producer
1364        let producer = thread::spawn(move || {
1365            for i in 0..50 {
1366                channel_prod.broadcast(i).unwrap();
1367            }
1368        });
1369
1370        // Reader 1
1371        let reader1 = thread::spawn(move || {
1372            let mut received = Vec::new();
1373            loop {
1374                if let Some(val) = channel1.read(id1) {
1375                    received.push(val);
1376                    if received.len() == 50 {
1377                        break;
1378                    }
1379                }
1380                thread::yield_now();
1381            }
1382            received
1383        });
1384
1385        // Reader 2
1386        let reader2 = thread::spawn(move || {
1387            let mut received = Vec::new();
1388            loop {
1389                if let Some(val) = channel2.read(id2) {
1390                    received.push(val);
1391                    if received.len() == 50 {
1392                        break;
1393                    }
1394                }
1395                thread::yield_now();
1396            }
1397            received
1398        });
1399
1400        producer.join().unwrap();
1401        let r1 = reader1.join().unwrap();
1402        let r2 = reader2.join().unwrap();
1403
1404        // Both readers should receive all 50 values in order
1405        assert_eq!(r1.len(), 50);
1406        assert_eq!(r2.len(), 50);
1407        assert_eq!(r1, r2);
1408    }
1409}