Skip to main content

photon_ring/
channel.rs

1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use crate::ring::{Padded, SharedRing};
5use crate::wait::WaitStrategy;
6use alloc::sync::Arc;
7use core::sync::atomic::{AtomicU64, Ordering};
8
9// ---------------------------------------------------------------------------
10// Errors
11// ---------------------------------------------------------------------------
12
13/// Error from [`Subscriber::try_recv`].
14#[derive(Debug, Clone, PartialEq, Eq)]
15pub enum TryRecvError {
16    /// No new messages available.
17    Empty,
18    /// Consumer fell behind the ring. `skipped` messages were lost.
19    Lagged { skipped: u64 },
20}
21
22/// Error returned by [`Publisher::try_publish`] when the ring is full
23/// and backpressure is enabled.
24#[derive(Debug, Clone, PartialEq, Eq)]
25pub enum PublishError<T> {
26    /// The slowest consumer is within the backpressure watermark.
27    /// Contains the value that was not published.
28    Full(T),
29}
30
31// ---------------------------------------------------------------------------
32// Publisher (single-producer write side)
33// ---------------------------------------------------------------------------
34
35/// The write side of a Photon SPMC channel.
36///
37/// There is exactly one `Publisher` per channel. It is `Send` but not `Sync` —
38/// only one thread may publish at a time (single-producer guarantee enforced
39/// by `&mut self`).
40pub struct Publisher<T: Copy> {
41    ring: Arc<SharedRing<T>>,
42    seq: u64,
43    /// Cached minimum cursor from the last tracker scan. Used as a fast-path
44    /// check to avoid scanning on every `try_publish` call.
45    cached_slowest: u64,
46}
47
48unsafe impl<T: Copy + Send> Send for Publisher<T> {}
49
50impl<T: Copy> Publisher<T> {
51    /// Publish a single value. Zero-allocation, O(1).
52    #[inline]
53    pub fn publish(&mut self, value: T) {
54        self.ring.slot(self.seq).write(self.seq, value);
55        self.ring.cursor.0.store(self.seq, Ordering::Release);
56        self.seq += 1;
57    }
58
59    /// Try to publish a single value with backpressure awareness.
60    ///
61    /// - On a regular (lossy) channel created with [`channel()`], this always
62    ///   succeeds — it publishes the value and returns `Ok(())`.
63    /// - On a bounded channel created with [`channel_bounded()`], this checks
64    ///   whether the slowest subscriber has fallen too far behind. If
65    ///   `publisher_seq - slowest_cursor >= capacity - watermark`, it returns
66    ///   `Err(PublishError::Full(value))` without writing.
67    #[inline]
68    pub fn try_publish(&mut self, value: T) -> Result<(), PublishError<T>> {
69        if let Some(bp) = self.ring.backpressure.as_ref() {
70            let capacity = self.ring.capacity();
71            let effective = capacity - bp.watermark;
72
73            // Fast path: use cached slowest cursor.
74            if self.seq >= self.cached_slowest + effective {
75                // Slow path: rescan all trackers.
76                match self.ring.slowest_cursor() {
77                    Some(slowest) => {
78                        self.cached_slowest = slowest;
79                        if self.seq >= slowest + effective {
80                            return Err(PublishError::Full(value));
81                        }
82                    }
83                    None => {
84                        // No subscribers registered yet — ring is unbounded.
85                    }
86                }
87            }
88        }
89        self.publish(value);
90        Ok(())
91    }
92
93    /// Publish a batch of values with a single cursor update.
94    ///
95    /// Each slot is written atomically (seqlock), but the cursor advances only
96    /// once at the end — consumers see the entire batch appear at once, and
97    /// cache-line bouncing on the shared cursor is reduced to one store.
98    #[inline]
99    pub fn publish_batch(&mut self, values: &[T]) {
100        if values.is_empty() {
101            return;
102        }
103        for (i, &v) in values.iter().enumerate() {
104            let seq = self.seq + i as u64;
105            self.ring.slot(seq).write(seq, v);
106        }
107        let last = self.seq + values.len() as u64 - 1;
108        self.ring.cursor.0.store(last, Ordering::Release);
109        self.seq += values.len() as u64;
110    }
111
112    /// Number of messages published so far.
113    #[inline]
114    pub fn published(&self) -> u64 {
115        self.seq
116    }
117
118    /// Current sequence number (same as `published()`).
119    /// Useful for computing lag: `publisher.sequence() - subscriber.cursor`.
120    #[inline]
121    pub fn sequence(&self) -> u64 {
122        self.seq
123    }
124
125    /// Ring capacity (power of two).
126    #[inline]
127    pub fn capacity(&self) -> u64 {
128        self.ring.capacity()
129    }
130
131    /// Lock the ring buffer pages in RAM, preventing the OS from swapping
132    /// them to disk. Reduces worst-case latency by eliminating page-fault
133    /// stalls on the hot path.
134    ///
135    /// Returns `true` on success. Requires `CAP_IPC_LOCK` or sufficient
136    /// `RLIMIT_MEMLOCK` on Linux. No-op on other platforms.
137    #[cfg(all(target_os = "linux", feature = "hugepages"))]
138    pub fn mlock(&self) -> bool {
139        let ptr = self.ring.slots_ptr() as *const u8;
140        let len = self.ring.slots_byte_len();
141        unsafe { crate::mem::mlock_pages(ptr, len) }
142    }
143
144    /// Pre-fault all ring buffer pages by writing a zero byte to each 4 KiB
145    /// page. Ensures the first publish does not trigger a page fault.
146    #[cfg(all(target_os = "linux", feature = "hugepages"))]
147    pub fn prefault(&self) {
148        let ptr = self.ring.slots_ptr() as *mut u8;
149        let len = self.ring.slots_byte_len();
150        unsafe { crate::mem::prefault_pages(ptr, len) }
151    }
152}
153
154// ---------------------------------------------------------------------------
155// Subscribable (factory for subscribers)
156// ---------------------------------------------------------------------------
157
158/// Clone-able handle for spawning [`Subscriber`]s.
159///
160/// Send this to other threads and call [`subscribe`](Subscribable::subscribe)
161/// to create independent consumers.
162pub struct Subscribable<T: Copy> {
163    ring: Arc<SharedRing<T>>,
164}
165
166impl<T: Copy> Clone for Subscribable<T> {
167    fn clone(&self) -> Self {
168        Subscribable {
169            ring: self.ring.clone(),
170        }
171    }
172}
173
174unsafe impl<T: Copy + Send> Send for Subscribable<T> {}
175unsafe impl<T: Copy + Send> Sync for Subscribable<T> {}
176
177impl<T: Copy> Subscribable<T> {
178    /// Create a subscriber that will see only **future** messages.
179    pub fn subscribe(&self) -> Subscriber<T> {
180        let head = self.ring.cursor.0.load(Ordering::Acquire);
181        let start = if head == u64::MAX { 0 } else { head + 1 };
182        let tracker = self.ring.register_tracker(start);
183        Subscriber {
184            ring: self.ring.clone(),
185            cursor: start,
186            tracker,
187            total_lagged: 0,
188            total_received: 0,
189        }
190    }
191
192    /// Create a [`SubscriberGroup`] of `N` subscribers starting from the next
193    /// message. All `N` logical subscribers share a single ring read — the
194    /// seqlock is checked once and all cursors are advanced together.
195    ///
196    /// This is dramatically faster than `N` independent [`Subscriber`]s when
197    /// polled in a loop on the same thread.
198    pub fn subscribe_group<const N: usize>(&self) -> SubscriberGroup<T, N> {
199        let head = self.ring.cursor.0.load(Ordering::Acquire);
200        let start = if head == u64::MAX { 0 } else { head + 1 };
201        SubscriberGroup {
202            ring: self.ring.clone(),
203            cursors: [start; N],
204            total_lagged: 0,
205            total_received: 0,
206        }
207    }
208
209    /// Create a subscriber starting from the **oldest available** message
210    /// still in the ring (or 0 if nothing published yet).
211    pub fn subscribe_from_oldest(&self) -> Subscriber<T> {
212        let head = self.ring.cursor.0.load(Ordering::Acquire);
213        let cap = self.ring.capacity();
214        let start = if head == u64::MAX {
215            0
216        } else if head >= cap {
217            head - cap + 1
218        } else {
219            0
220        };
221        let tracker = self.ring.register_tracker(start);
222        Subscriber {
223            ring: self.ring.clone(),
224            cursor: start,
225            tracker,
226            total_lagged: 0,
227            total_received: 0,
228        }
229    }
230}
231
232// ---------------------------------------------------------------------------
233// Subscriber (consumer read side)
234// ---------------------------------------------------------------------------
235
236/// The read side of a Photon SPMC channel.
237///
238/// Each subscriber has its own cursor — no contention between consumers.
239pub struct Subscriber<T: Copy> {
240    ring: Arc<SharedRing<T>>,
241    cursor: u64,
242    /// Per-subscriber cursor tracker for backpressure. `None` on regular
243    /// (lossy) channels — zero overhead.
244    tracker: Option<Arc<Padded<AtomicU64>>>,
245    /// Cumulative messages skipped due to lag.
246    total_lagged: u64,
247    /// Cumulative messages successfully received.
248    total_received: u64,
249}
250
251unsafe impl<T: Copy + Send> Send for Subscriber<T> {}
252
253impl<T: Copy> Subscriber<T> {
254    /// Try to receive the next message without blocking.
255    #[inline]
256    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
257        self.read_slot()
258    }
259
260    /// Spin until the next message is available and return it.
261    ///
262    /// Uses a two-phase spin strategy: bare spin for the first 64 iterations
263    /// (minimum wakeup latency, ~0 ns reaction time), then `PAUSE`-based spin
264    /// (saves power, yields to SMT sibling). On Skylake+, `PAUSE` adds ~140
265    /// cycles of delay per iteration — the bare-spin phase avoids this penalty
266    /// when the message arrives quickly (typical for cross-thread pub/sub).
267    #[inline]
268    pub fn recv(&mut self) -> T {
269        let slot = self.ring.slot(self.cursor);
270        let expected = self.cursor * 2 + 2;
271        // Phase 1: bare spin — no PAUSE, minimum wakeup latency
272        for _ in 0..64 {
273            match slot.try_read(self.cursor) {
274                Ok(Some(value)) => {
275                    self.cursor += 1;
276                    self.update_tracker();
277                    return value;
278                }
279                Ok(None) => {}
280                Err(stamp) => {
281                    if stamp >= expected {
282                        return self.recv_slow();
283                    }
284                }
285            }
286        }
287        // Phase 2: PAUSE-based spin — power efficient
288        loop {
289            match slot.try_read(self.cursor) {
290                Ok(Some(value)) => {
291                    self.cursor += 1;
292                    self.update_tracker();
293                    return value;
294                }
295                Ok(None) => core::hint::spin_loop(),
296                Err(stamp) => {
297                    if stamp < expected {
298                        core::hint::spin_loop();
299                    } else {
300                        return self.recv_slow();
301                    }
302                }
303            }
304        }
305    }
306
307    /// Slow path for lag recovery in recv().
308    #[cold]
309    #[inline(never)]
310    fn recv_slow(&mut self) -> T {
311        loop {
312            match self.try_recv() {
313                Ok(val) => return val,
314                Err(TryRecvError::Empty) => core::hint::spin_loop(),
315                Err(TryRecvError::Lagged { .. }) => {}
316            }
317        }
318    }
319
320    /// Block until the next message using the given [`WaitStrategy`].
321    ///
322    /// Unlike [`recv()`](Self::recv), which hard-codes a two-phase spin,
323    /// this method delegates idle behaviour to the strategy — enabling
324    /// yield-based, park-based, or adaptive waiting.
325    ///
326    /// # Example
327    /// ```
328    /// use photon_ring::{channel, WaitStrategy};
329    ///
330    /// let (mut p, s) = channel::<u64>(64);
331    /// let mut sub = s.subscribe();
332    /// p.publish(7);
333    /// assert_eq!(sub.recv_with(WaitStrategy::BusySpin), 7);
334    /// ```
335    #[inline]
336    pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
337        let mut iter: u32 = 0;
338        loop {
339            match self.try_recv() {
340                Ok(val) => return val,
341                Err(TryRecvError::Empty) => {
342                    strategy.wait(iter);
343                    iter = iter.saturating_add(1);
344                }
345                Err(TryRecvError::Lagged { .. }) => {
346                    // Cursor was advanced by try_recv — retry immediately.
347                    iter = 0;
348                }
349            }
350        }
351    }
352
353    /// Skip to the **latest** published message (discards intermediate ones).
354    ///
355    /// Returns `None` only if nothing has been published yet. Under heavy
356    /// producer load, retries internally if the target slot is mid-write.
357    pub fn latest(&mut self) -> Option<T> {
358        loop {
359            let head = self.ring.cursor.0.load(Ordering::Acquire);
360            if head == u64::MAX {
361                return None;
362            }
363            self.cursor = head;
364            match self.read_slot() {
365                Ok(v) => return Some(v),
366                Err(TryRecvError::Empty) => return None,
367                Err(TryRecvError::Lagged { .. }) => {
368                    // Producer lapped us between cursor read and slot read.
369                    // Retry with updated head.
370                }
371            }
372        }
373    }
374
375    /// How many messages are available to read (capped at ring capacity).
376    #[inline]
377    pub fn pending(&self) -> u64 {
378        let head = self.ring.cursor.0.load(Ordering::Acquire);
379        if head == u64::MAX || self.cursor > head {
380            0
381        } else {
382            let raw = head - self.cursor + 1;
383            raw.min(self.ring.capacity())
384        }
385    }
386
387    /// Total messages successfully received by this subscriber.
388    #[inline]
389    pub fn total_received(&self) -> u64 {
390        self.total_received
391    }
392
393    /// Total messages lost due to lag (consumer fell behind the ring).
394    #[inline]
395    pub fn total_lagged(&self) -> u64 {
396        self.total_lagged
397    }
398
399    /// Ratio of received to total (received + lagged). Returns 0.0 if no
400    /// messages have been processed.
401    #[inline]
402    pub fn receive_ratio(&self) -> f64 {
403        let total = self.total_received + self.total_lagged;
404        if total == 0 {
405            0.0
406        } else {
407            self.total_received as f64 / total as f64
408        }
409    }
410
411    /// Update the backpressure tracker to reflect the current cursor position.
412    /// No-op on regular (lossy) channels.
413    #[inline]
414    fn update_tracker(&self) {
415        if let Some(ref tracker) = self.tracker {
416            tracker.0.store(self.cursor, Ordering::Release);
417        }
418    }
419
420    /// Stamp-only fast-path read. The consumer's local `self.cursor` tells us
421    /// which slot and expected stamp to check — no shared cursor load needed
422    /// on the hot path.
423    #[inline]
424    fn read_slot(&mut self) -> Result<T, TryRecvError> {
425        let slot = self.ring.slot(self.cursor);
426        let expected = self.cursor * 2 + 2;
427
428        match slot.try_read(self.cursor) {
429            Ok(Some(value)) => {
430                self.cursor += 1;
431                self.update_tracker();
432                self.total_received += 1;
433                Ok(value)
434            }
435            Ok(None) => {
436                // Torn read or write-in-progress — treat as empty for try_recv
437                Err(TryRecvError::Empty)
438            }
439            Err(actual_stamp) => {
440                // Odd stamp means write-in-progress — not ready yet
441                if actual_stamp & 1 != 0 {
442                    return Err(TryRecvError::Empty);
443                }
444                if actual_stamp < expected {
445                    // Slot holds an older (or no) sequence — not published yet
446                    Err(TryRecvError::Empty)
447                } else {
448                    // stamp > expected: slot was overwritten — slow path.
449                    // Read head cursor to compute exact lag.
450                    let head = self.ring.cursor.0.load(Ordering::Acquire);
451                    let cap = self.ring.capacity();
452                    if head == u64::MAX || self.cursor > head {
453                        // Rare race: stamp updated but cursor not yet visible
454                        return Err(TryRecvError::Empty);
455                    }
456                    if head >= cap {
457                        let oldest = head - cap + 1;
458                        if self.cursor < oldest {
459                            let skipped = oldest - self.cursor;
460                            self.cursor = oldest;
461                            self.update_tracker();
462                            self.total_lagged += skipped;
463                            return Err(TryRecvError::Lagged { skipped });
464                        }
465                    }
466                    // Head hasn't caught up yet (rare timing race)
467                    Err(TryRecvError::Empty)
468                }
469            }
470        }
471    }
472}
473
474// ---------------------------------------------------------------------------
475// SubscriberGroup (batched multi-consumer read)
476// ---------------------------------------------------------------------------
477
478/// A group of `N` logical subscribers backed by a single ring read.
479///
480/// When all `N` cursors are at the same position (the common case),
481/// [`try_recv`](SubscriberGroup::try_recv) performs **one** seqlock read
482/// and advances all `N` cursors — reducing per-subscriber overhead from
483/// ~1.1 ns to ~0.15 ns.
484///
485/// ```
486/// let (mut p, subs) = photon_ring::channel::<u64>(64);
487/// let mut group = subs.subscribe_group::<4>();
488/// p.publish(42);
489/// assert_eq!(group.try_recv(), Ok(42));
490/// ```
491pub struct SubscriberGroup<T: Copy, const N: usize> {
492    ring: Arc<SharedRing<T>>,
493    cursors: [u64; N],
494    /// Cumulative messages skipped due to lag.
495    total_lagged: u64,
496    /// Cumulative messages successfully received.
497    total_received: u64,
498}
499
500unsafe impl<T: Copy + Send, const N: usize> Send for SubscriberGroup<T, N> {}
501
502impl<T: Copy, const N: usize> SubscriberGroup<T, N> {
503    /// Try to receive the next message for the group.
504    ///
505    /// On the fast path (all cursors aligned), this does a single seqlock
506    /// read and sweeps all `N` cursors — the compiler unrolls the cursor
507    /// increment loop for small `N`.
508    #[inline]
509    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
510        // Fast path: all cursors at the same position (common case).
511        let first = self.cursors[0];
512        let slot = self.ring.slot(first);
513        let expected = first * 2 + 2;
514
515        match slot.try_read(first) {
516            Ok(Some(value)) => {
517                // Single seqlock read succeeded — advance all aligned cursors.
518                for c in self.cursors.iter_mut() {
519                    if *c == first {
520                        *c = first + 1;
521                    }
522                }
523                self.total_received += 1;
524                Ok(value)
525            }
526            Ok(None) => Err(TryRecvError::Empty),
527            Err(actual_stamp) => {
528                if actual_stamp & 1 != 0 || actual_stamp < expected {
529                    return Err(TryRecvError::Empty);
530                }
531                // Lagged — recompute from head cursor
532                let head = self.ring.cursor.0.load(Ordering::Acquire);
533                let cap = self.ring.capacity();
534                if head == u64::MAX || first > head {
535                    return Err(TryRecvError::Empty);
536                }
537                if head >= cap {
538                    let oldest = head - cap + 1;
539                    if first < oldest {
540                        let skipped = oldest - first;
541                        for c in self.cursors.iter_mut() {
542                            if *c < oldest {
543                                *c = oldest;
544                            }
545                        }
546                        self.total_lagged += skipped;
547                        return Err(TryRecvError::Lagged { skipped });
548                    }
549                }
550                Err(TryRecvError::Empty)
551            }
552        }
553    }
554
555    /// Spin until the next message is available.
556    #[inline]
557    pub fn recv(&mut self) -> T {
558        loop {
559            match self.try_recv() {
560                Ok(val) => return val,
561                Err(TryRecvError::Empty) => core::hint::spin_loop(),
562                Err(TryRecvError::Lagged { .. }) => {}
563            }
564        }
565    }
566
567    /// Block until the next message using the given [`WaitStrategy`].
568    ///
569    /// Like [`Subscriber::recv_with`], but for the grouped fast path.
570    ///
571    /// # Example
572    /// ```
573    /// use photon_ring::{channel, WaitStrategy};
574    ///
575    /// let (mut p, s) = channel::<u64>(64);
576    /// let mut group = s.subscribe_group::<2>();
577    /// p.publish(42);
578    /// assert_eq!(group.recv_with(WaitStrategy::BusySpin), 42);
579    /// ```
580    #[inline]
581    pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
582        let mut iter: u32 = 0;
583        loop {
584            match self.try_recv() {
585                Ok(val) => return val,
586                Err(TryRecvError::Empty) => {
587                    strategy.wait(iter);
588                    iter = iter.saturating_add(1);
589                }
590                Err(TryRecvError::Lagged { .. }) => {
591                    iter = 0;
592                }
593            }
594        }
595    }
596
597    /// How many of the `N` cursors are at the minimum (aligned) position.
598    pub fn aligned_count(&self) -> usize {
599        let min = self.cursors.iter().copied().min().unwrap_or(0);
600        self.cursors.iter().filter(|&&c| c == min).count()
601    }
602
603    /// Number of messages available (based on the slowest cursor).
604    pub fn pending(&self) -> u64 {
605        let head = self.ring.cursor.0.load(Ordering::Acquire);
606        let min = self.cursors.iter().copied().min().unwrap_or(0);
607        if head == u64::MAX || min > head {
608            0
609        } else {
610            let raw = head - min + 1;
611            raw.min(self.ring.capacity())
612        }
613    }
614
615    /// Total messages successfully received by this group.
616    #[inline]
617    pub fn total_received(&self) -> u64 {
618        self.total_received
619    }
620
621    /// Total messages lost due to lag (group fell behind the ring).
622    #[inline]
623    pub fn total_lagged(&self) -> u64 {
624        self.total_lagged
625    }
626
627    /// Ratio of received to total (received + lagged). Returns 0.0 if no
628    /// messages have been processed.
629    #[inline]
630    pub fn receive_ratio(&self) -> f64 {
631        let total = self.total_received + self.total_lagged;
632        if total == 0 {
633            0.0
634        } else {
635            self.total_received as f64 / total as f64
636        }
637    }
638}
639
640// ---------------------------------------------------------------------------
641// Constructors
642// ---------------------------------------------------------------------------
643
644/// Create a Photon SPMC channel.
645///
646/// `capacity` must be a power of two (>= 2). Returns the single-producer
647/// write end and a clone-able factory for creating consumers.
648///
649/// # Example
650/// ```
651/// let (mut pub_, subs) = photon_ring::channel::<u64>(64);
652/// let mut sub = subs.subscribe();
653/// pub_.publish(42);
654/// assert_eq!(sub.try_recv(), Ok(42));
655/// ```
656pub fn channel<T: Copy + Send>(capacity: usize) -> (Publisher<T>, Subscribable<T>) {
657    let ring = Arc::new(SharedRing::new(capacity));
658    (
659        Publisher {
660            ring: ring.clone(),
661            seq: 0,
662            cached_slowest: 0,
663        },
664        Subscribable { ring },
665    )
666}
667
668/// Create a backpressure-capable SPMC channel.
669///
670/// The publisher will refuse to publish (returning [`PublishError::Full`])
671/// when it would overwrite a slot that the slowest subscriber hasn't
672/// read yet, minus `watermark` slots of headroom.
673///
674/// Unlike the default lossy [`channel()`], no messages are ever dropped.
675///
676/// # Arguments
677/// - `capacity` — ring size, must be a power of two (>= 2).
678/// - `watermark` — headroom slots; must be less than `capacity`.
679///   A watermark of 0 means the publisher blocks as soon as all slots are
680///   occupied. A watermark of `capacity - 1` means it blocks when only one
681///   slot is free.
682///
683/// # Example
684/// ```
685/// use photon_ring::channel_bounded;
686/// use photon_ring::PublishError;
687///
688/// let (mut p, s) = channel_bounded::<u64>(4, 0);
689/// let mut sub = s.subscribe();
690///
691/// // Fill the ring (4 slots).
692/// for i in 0u64..4 {
693///     p.try_publish(i).unwrap();
694/// }
695///
696/// // Ring is full — backpressure kicks in.
697/// assert_eq!(p.try_publish(99u64), Err(PublishError::Full(99)));
698///
699/// // Drain one slot — publisher can continue.
700/// assert_eq!(sub.try_recv(), Ok(0));
701/// p.try_publish(99).unwrap();
702/// ```
703pub fn channel_bounded<T: Copy + Send>(
704    capacity: usize,
705    watermark: usize,
706) -> (Publisher<T>, Subscribable<T>) {
707    let ring = Arc::new(SharedRing::new_bounded(capacity, watermark));
708    (
709        Publisher {
710            ring: ring.clone(),
711            seq: 0,
712            cached_slowest: 0,
713        },
714        Subscribable { ring },
715    )
716}