Skip to main content

kovan_queue/
disruptor.rs

1use crate::utils::CacheAligned;
2use std::cell::UnsafeCell;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
5use std::thread;
6
7/// A sequence number in the Disruptor.
8///
9/// Padded to prevent false sharing.
10#[derive(Debug)]
11pub struct Sequence {
12    value: CacheAligned<AtomicI64>,
13}
14
15impl Sequence {
16    /// Creates a new sequence with the given initial value.
17    pub fn new(initial: i64) -> Self {
18        Sequence {
19            value: CacheAligned::new(AtomicI64::new(initial)),
20        }
21    }
22
23    /// Gets the current value of the sequence.
24    pub fn get(&self) -> i64 {
25        self.value.load(Ordering::Acquire)
26    }
27
28    /// Sets the value of the sequence.
29    pub fn set(&self, value: i64) {
30        self.value.store(value, Ordering::Release);
31    }
32
33    /// Atomically compares and sets the value of the sequence.
34    ///
35    /// Returns `true` if the swap occurred.
36    pub fn compare_and_set(&self, current: i64, new: i64) -> bool {
37        self.value
38            .compare_exchange(current, new, Ordering::SeqCst, Ordering::Relaxed)
39            .is_ok()
40    }
41}
42
43/// Sequencer trait for claiming and publishing slots.
44pub trait Sequencer: Send + Sync {
45    /// Claims the next sequence.
46    fn next(&self) -> i64;
47
48    /// Publishes the given sequence.
49    fn publish(&self, sequence: i64);
50
51    /// Gets the current cursor value (highest published sequence).
52    fn get_cursor(&self) -> i64;
53
54    /// Adds gating sequences.
55    fn add_gating_sequences(&self, sequences: Vec<Arc<Sequence>>);
56
57    /// Gets the highest published sequence that is safe to read.
58    fn get_highest_published_sequence(&self, next_sequence: i64, available_sequence: i64) -> i64;
59}
60
61/// Single Producer Sequencer.
62///
63/// Optimized for single-threaded publishing.
64pub struct SingleProducerSequencer {
65    /// The cursor for the sequencer (highest published sequence).
66    cursor: Arc<Sequence>,
67    /// The next sequence to be claimed.
68    next_sequence: Sequence,
69    /// Sequences to gate on (prevent wrapping).
70    gating_sequences: UnsafeCell<Vec<Arc<Sequence>>>,
71    /// Size of the buffer.
72    buffer_size: usize,
73    /// Strategy for waiting.
74    wait_strategy: Arc<dyn WaitStrategy>,
75}
76
77unsafe impl Send for SingleProducerSequencer {}
78unsafe impl Sync for SingleProducerSequencer {}
79
80impl SingleProducerSequencer {
81    pub fn new(buffer_size: usize, wait_strategy: Arc<dyn WaitStrategy>) -> Self {
82        Self {
83            cursor: Arc::new(Sequence::new(-1)),
84            next_sequence: Sequence::new(-1),
85            gating_sequences: UnsafeCell::new(Vec::new()),
86            buffer_size,
87            wait_strategy,
88        }
89    }
90}
91
92impl Sequencer for SingleProducerSequencer {
93    fn next(&self) -> i64 {
94        let next = self.next_sequence.get() + 1;
95        self.next_sequence.set(next);
96
97        let wrap_point = next - self.buffer_size as i64;
98        let gating_sequences = unsafe { &*self.gating_sequences.get() };
99
100        // Cached gating sequence check could be added here for optimization
101        // For now, simple check
102        let mut min_gating_sequence = i64::MAX;
103        for seq in gating_sequences {
104            let s = seq.get();
105            if s < min_gating_sequence {
106                min_gating_sequence = s;
107            }
108        }
109
110        if wrap_point > min_gating_sequence {
111            while wrap_point > min_gating_sequence {
112                thread::yield_now();
113                min_gating_sequence = i64::MAX;
114                for seq in gating_sequences {
115                    let s = seq.get();
116                    if s < min_gating_sequence {
117                        min_gating_sequence = s;
118                    }
119                }
120            }
121        }
122
123        next
124    }
125
126    fn publish(&self, sequence: i64) {
127        self.cursor.set(sequence);
128        self.wait_strategy.signal_all_when_blocking();
129    }
130
131    fn get_cursor(&self) -> i64 {
132        self.cursor.get()
133    }
134
135    fn add_gating_sequences(&self, sequences: Vec<Arc<Sequence>>) {
136        unsafe {
137            (*self.gating_sequences.get()).extend(sequences);
138        }
139    }
140
141    fn get_highest_published_sequence(&self, _next_sequence: i64, available_sequence: i64) -> i64 {
142        available_sequence
143    }
144}
145
146/// Multi Producer Sequencer.
147///
148/// Thread-safe for multiple producers.
149pub struct MultiProducerSequencer {
150    /// Sequences to gate on.
151    gating_sequences: UnsafeCell<Vec<Arc<Sequence>>>,
152    /// Size of the buffer.
153    buffer_size: usize,
154    /// Strategy for waiting.
155    wait_strategy: Arc<dyn WaitStrategy>,
156    /// The sequence used for claiming slots.
157    claim_sequence: AtomicI64,
158    /// Buffer tracking published slots for availability.
159    available_buffer: Box<[AtomicI64]>,
160    /// Mask for fast modulo operations.
161    mask: usize,
162}
163
164unsafe impl Send for MultiProducerSequencer {}
165unsafe impl Sync for MultiProducerSequencer {}
166
167impl MultiProducerSequencer {
168    pub fn new(buffer_size: usize, wait_strategy: Arc<dyn WaitStrategy>) -> Self {
169        let mut available_buffer = Vec::with_capacity(buffer_size);
170        for _ in 0..buffer_size {
171            available_buffer.push(AtomicI64::new(-1));
172        }
173
174        Self {
175            gating_sequences: UnsafeCell::new(Vec::new()),
176            buffer_size,
177            wait_strategy,
178            claim_sequence: AtomicI64::new(-1),
179            available_buffer: available_buffer.into_boxed_slice(),
180            mask: buffer_size - 1,
181        }
182    }
183}
184
185impl Sequencer for MultiProducerSequencer {
186    fn next(&self) -> i64 {
187        let current = self.claim_sequence.fetch_add(1, Ordering::SeqCst);
188        let next = current + 1;
189
190        let wrap_point = next - self.buffer_size as i64;
191        let gating_sequences = unsafe { &*self.gating_sequences.get() };
192
193        let mut min_gating_sequence = i64::MAX;
194        for seq in gating_sequences {
195            let s = seq.get();
196            if s < min_gating_sequence {
197                min_gating_sequence = s;
198            }
199        }
200
201        if wrap_point > min_gating_sequence {
202            while wrap_point > min_gating_sequence {
203                thread::yield_now();
204                min_gating_sequence = i64::MAX;
205                for seq in gating_sequences {
206                    let s = seq.get();
207                    if s < min_gating_sequence {
208                        min_gating_sequence = s;
209                    }
210                }
211            }
212        }
213
214        next
215    }
216
217    fn publish(&self, sequence: i64) {
218        let index = (sequence as usize) & self.mask;
219        self.available_buffer[index].store(sequence, Ordering::Release);
220        self.wait_strategy.signal_all_when_blocking();
221    }
222
223    fn get_cursor(&self) -> i64 {
224        self.claim_sequence.load(Ordering::Relaxed)
225    }
226
227    fn add_gating_sequences(&self, sequences: Vec<Arc<Sequence>>) {
228        unsafe {
229            (*self.gating_sequences.get()).extend(sequences);
230        }
231    }
232
233    fn get_highest_published_sequence(&self, next_sequence: i64, available_sequence: i64) -> i64 {
234        // Check if all sequences up to the requested one are published.
235        // In MP, we must ensure contiguous availability.
236
237        let mut sequence = next_sequence;
238        while sequence <= available_sequence {
239            if !self.is_published(sequence) {
240                return sequence - 1;
241            }
242            sequence += 1;
243        }
244        available_sequence
245    }
246}
247
248impl MultiProducerSequencer {
249    fn is_published(&self, sequence: i64) -> bool {
250        let index = (sequence as usize) & self.mask;
251        self.available_buffer[index].load(Ordering::Acquire) == sequence
252    }
253}
254
255/// Strategy for waiting for a sequence to be available.
256pub trait WaitStrategy: Send + Sync {
257    /// Waits for the given sequence to be available.
258    fn wait_for(
259        &self,
260        sequence: i64,
261        cursor: &Arc<dyn Sequencer>,
262        dependent: &Arc<dyn Sequencer>,
263        barrier: &ProcessingSequenceBarrier,
264    ) -> Result<i64, AlertException>;
265
266    /// Signals all waiting threads that the cursor has advanced.
267    fn signal_all_when_blocking(&self);
268}
269
270#[derive(Debug, Clone, Copy)]
271pub struct AlertException;
272
273/// Busy Spin Wait Strategy.
274///
275/// Low latency but high CPU usage.
276pub struct BusySpinWaitStrategy;
277
278impl WaitStrategy for BusySpinWaitStrategy {
279    fn wait_for(
280        &self,
281        sequence: i64,
282        _cursor: &Arc<dyn Sequencer>,
283        dependent: &Arc<dyn Sequencer>,
284        barrier: &ProcessingSequenceBarrier,
285    ) -> Result<i64, AlertException> {
286        let mut available_sequence;
287        loop {
288            if barrier.is_alerted() {
289                return Err(AlertException);
290            }
291            available_sequence = dependent.get_cursor();
292            if available_sequence >= sequence {
293                return Ok(available_sequence);
294            }
295            std::hint::spin_loop();
296        }
297    }
298
299    fn signal_all_when_blocking(&self) {}
300}
301
302/// Yielding Wait Strategy.
303///
304/// Compromise between latency and CPU usage.
305pub struct YieldingWaitStrategy;
306
307impl WaitStrategy for YieldingWaitStrategy {
308    fn wait_for(
309        &self,
310        sequence: i64,
311        _cursor: &Arc<dyn Sequencer>,
312        dependent: &Arc<dyn Sequencer>,
313        barrier: &ProcessingSequenceBarrier,
314    ) -> Result<i64, AlertException> {
315        let mut counter = 100;
316        let mut available_sequence;
317        loop {
318            if barrier.is_alerted() {
319                return Err(AlertException);
320            }
321            available_sequence = dependent.get_cursor();
322            if available_sequence >= sequence {
323                return Ok(available_sequence);
324            }
325
326            counter -= 1;
327            if counter == 0 {
328                thread::yield_now();
329                counter = 100;
330            } else {
331                std::hint::spin_loop();
332            }
333        }
334    }
335
336    fn signal_all_when_blocking(&self) {}
337}
338
339/// Blocking Wait Strategy.
340///
341/// Uses a lock and condition variable. Lowest CPU usage.
342pub struct BlockingWaitStrategy {
343    mutex: std::sync::Mutex<()>,
344    condvar: std::sync::Condvar,
345}
346
347impl Default for BlockingWaitStrategy {
348    fn default() -> Self {
349        Self::new()
350    }
351}
352
353impl BlockingWaitStrategy {
354    /// Creates a new blocking wait strategy.
355    pub fn new() -> Self {
356        Self {
357            mutex: std::sync::Mutex::new(()),
358            condvar: std::sync::Condvar::new(),
359        }
360    }
361}
362
363impl WaitStrategy for BlockingWaitStrategy {
364    fn wait_for(
365        &self,
366        sequence: i64,
367        _cursor: &Arc<dyn Sequencer>,
368        dependent: &Arc<dyn Sequencer>,
369        barrier: &ProcessingSequenceBarrier,
370    ) -> Result<i64, AlertException> {
371        let mut available_sequence = dependent.get_cursor();
372        if available_sequence < sequence {
373            let mut guard = self.mutex.lock().unwrap();
374            while dependent.get_cursor() < sequence {
375                if barrier.is_alerted() {
376                    return Err(AlertException);
377                }
378                guard = self.condvar.wait(guard).unwrap();
379                // Re-acquire guard is automatic
380                // Loop continues to check condition
381            }
382            available_sequence = dependent.get_cursor();
383        }
384
385        while available_sequence < sequence {
386            if barrier.is_alerted() {
387                return Err(AlertException);
388            }
389            available_sequence = dependent.get_cursor();
390            // Busy spin fallback or check again
391            thread::yield_now();
392        }
393
394        Ok(available_sequence)
395    }
396
397    fn signal_all_when_blocking(&self) {
398        let _guard = self.mutex.lock().unwrap();
399        self.condvar.notify_all();
400    }
401}
402
403/// Coordination barrier for tracking dependencies.
404pub struct ProcessingSequenceBarrier {
405    /// Strategy for waiting.
406    wait_strategy: Arc<dyn WaitStrategy>,
407    /// The sequencer to wait on (dependent).
408    dependent_sequencer: Arc<dyn Sequencer>,
409    /// The sequencer of the ring buffer (cursor).
410    cursor_sequencer: Arc<dyn Sequencer>,
411    /// Whether the barrier has been alerted.
412    alerted: AtomicBool,
413}
414
415impl ProcessingSequenceBarrier {
416    /// Creates a new processing sequence barrier.
417    pub fn new(
418        wait_strategy: Arc<dyn WaitStrategy>,
419        dependent_sequencer: Arc<dyn Sequencer>,
420        cursor_sequencer: Arc<dyn Sequencer>,
421    ) -> Self {
422        Self {
423            wait_strategy,
424            dependent_sequencer,
425            cursor_sequencer,
426            alerted: AtomicBool::new(false),
427        }
428    }
429
430    /// Waits for the given sequence to be available.
431    pub fn wait_for(&self, sequence: i64) -> Result<i64, AlertException> {
432        let available = self.wait_strategy.wait_for(
433            sequence,
434            &self.cursor_sequencer,
435            &self.dependent_sequencer,
436            self,
437        )?;
438
439        // Ensure the sequence is fully published (crucial for MP).
440        Ok(self
441            .cursor_sequencer
442            .get_highest_published_sequence(sequence, available))
443    }
444
445    /// Returns true if the barrier has been alerted.
446    pub fn is_alerted(&self) -> bool {
447        self.alerted.load(Ordering::Relaxed)
448    }
449
450    /// Alerts the barrier, causing waiters to wake up.
451    pub fn alert(&self) {
452        self.alerted.store(true, Ordering::Relaxed);
453        self.wait_strategy.signal_all_when_blocking();
454    }
455
456    /// Clears the alert status.
457    pub fn clear_alert(&self) {
458        self.alerted.store(false, Ordering::Relaxed);
459    }
460}
461
462/// Event Handler trait.
463pub trait EventHandler<T>: Send + Sync {
464    /// Called when an event is available for processing.
465    fn on_event(&self, event: &T, sequence: u64, end_of_batch: bool);
466}
467
468/// Ring Buffer.
469pub struct RingBuffer<T> {
470    /// The buffer of events.
471    buffer: Box<[UnsafeCell<T>]>,
472    /// Mask for fast modulo operations.
473    mask: usize,
474    /// The sequencer managing this buffer.
475    sequencer: Arc<dyn Sequencer>,
476}
477
478unsafe impl<T: Send> Send for RingBuffer<T> {}
479unsafe impl<T: Send> Sync for RingBuffer<T> {}
480
481impl<T> RingBuffer<T> {
482    /// Creates a new ring buffer with the given factory, size, and sequencer.
483    pub fn new<F>(factory: F, size: usize, sequencer: Arc<dyn Sequencer>) -> Self
484    where
485        F: Fn() -> T,
486    {
487        let capacity = size.next_power_of_two();
488        let mut buffer = Vec::with_capacity(capacity);
489        for _ in 0..capacity {
490            buffer.push(UnsafeCell::new(factory()));
491        }
492
493        Self {
494            buffer: buffer.into_boxed_slice(),
495            mask: capacity - 1,
496            sequencer,
497        }
498    }
499
500    /// Adds gating sequences to the sequencer.
501    pub fn add_gating_sequences(&self, sequences: Vec<Arc<Sequence>>) {
502        self.sequencer.add_gating_sequences(sequences);
503    }
504
505    /// Gets a reference to the event at the given sequence.
506    pub fn get(&self, sequence: i64) -> &T {
507        unsafe { &*self.buffer[(sequence as usize) & self.mask].get() }
508    }
509
510    /// Gets a mutable reference to the event at the given sequence.
511    pub fn get_mut(&mut self, sequence: i64) -> &mut T {
512        unsafe { &mut *self.buffer[(sequence as usize) & self.mask].get() }
513    }
514
515    /// Unsafe access for high performance (internal use).
516    ///
517    /// # Safety
518    ///
519    /// The caller must ensure that the sequence number is valid and that
520    /// no other thread is concurrently modifying the same slot.
521    pub unsafe fn get_unchecked(&self, sequence: i64) -> &T {
522        unsafe {
523            &*self
524                .buffer
525                .get_unchecked((sequence as usize) & self.mask)
526                .get()
527        }
528    }
529
530    /// Unsafe mutable access for high performance (internal use).
531    ///
532    /// # Safety
533    ///
534    /// The caller must ensure that the sequence number is valid and that
535    /// they have exclusive access to the slot (e.g., via the Disruptor protocol).
536    #[allow(clippy::mut_from_ref)]
537    pub unsafe fn get_unchecked_mut(&self, sequence: i64) -> &mut T {
538        unsafe {
539            &mut *self
540                .buffer
541                .get_unchecked((sequence as usize) & self.mask)
542                .get()
543        }
544    }
545
546    /// Claims the next sequence in the ring buffer.
547    pub fn next(&self) -> i64 {
548        self.sequencer.next()
549    }
550
551    /// Publishes the sequence, making it available to consumers.
552    pub fn publish(&self, sequence: i64) {
553        self.sequencer.publish(sequence);
554    }
555}
556
557/// Producer handle.
558pub struct Producer<T> {
559    ring_buffer: Arc<RingBuffer<T>>,
560}
561
562impl<T> Producer<T> {
563    /// Publishes an event to the ring buffer.
564    pub fn publish<F>(&mut self, update: F)
565    where
566        F: FnOnce(&mut T),
567    {
568        let sequence = self.ring_buffer.next();
569        // SAFETY: We have claimed the sequence, so we have exclusive access to this slot.
570        let event = unsafe { self.ring_buffer.get_unchecked_mut(sequence) };
571        update(event);
572        self.ring_buffer.publish(sequence);
573    }
574}
575
576/// Batch Event Processor.
577pub struct BatchEventProcessor<T> {
578    /// The ring buffer to read from.
579    ring_buffer: Arc<RingBuffer<T>>,
580    /// The sequence of this processor.
581    sequence: Arc<Sequence>,
582    /// The barrier to wait on.
583    barrier: Arc<ProcessingSequenceBarrier>,
584    /// The handler to process events.
585    handler: Arc<dyn EventHandler<T>>,
586}
587
588impl<T> BatchEventProcessor<T> {
589    /// Creates a new batch event processor.
590    pub fn new(
591        ring_buffer: Arc<RingBuffer<T>>,
592        barrier: Arc<ProcessingSequenceBarrier>,
593        handler: Arc<dyn EventHandler<T>>,
594    ) -> Self {
595        Self {
596            ring_buffer,
597            sequence: Arc::new(Sequence::new(-1)),
598            barrier,
599            handler,
600        }
601    }
602
603    /// Gets the sequence of the processor.
604    pub fn get_sequence(&self) -> Arc<Sequence> {
605        self.sequence.clone()
606    }
607
608    /// Runs the processor loop.
609    pub fn run(&self) {
610        let mut next_sequence = self.sequence.get() + 1;
611        loop {
612            match self.barrier.wait_for(next_sequence) {
613                Ok(available_sequence) => {
614                    while next_sequence <= available_sequence {
615                        // SAFETY: The barrier guarantees that the sequence is available for reading.
616                        let event = unsafe { self.ring_buffer.get_unchecked(next_sequence) };
617                        self.handler.on_event(
618                            event,
619                            next_sequence as u64,
620                            next_sequence == available_sequence,
621                        );
622                        next_sequence += 1;
623                    }
624                    self.sequence.set(available_sequence);
625                }
626                Err(_) => {
627                    if self.barrier.is_alerted() {
628                        break;
629                    }
630                }
631            }
632        }
633    }
634}
635
636/// Disruptor Facade.
637pub struct Disruptor<T> {
638    /// The ring buffer.
639    ring_buffer: Arc<RingBuffer<T>>,
640    /// The registered event processors.
641    processors: Vec<Arc<BatchEventProcessor<T>>>,
642    /// Whether the disruptor has been started.
643    started: bool,
644    /// The wait strategy used.
645    wait_strategy: Arc<dyn WaitStrategy>,
646}
647
648pub enum ProducerType {
649    Single,
650    Multi,
651}
652
653pub struct DisruptorBuilder<T, F> {
654    /// Factory for creating events.
655    factory: F,
656    /// Size of the ring buffer.
657    buffer_size: usize,
658    /// Wait strategy to use.
659    wait_strategy: Arc<dyn WaitStrategy>,
660    /// Type of producer (Single or Multi).
661    producer_type: ProducerType,
662    _marker: std::marker::PhantomData<T>,
663}
664
665impl<T, F> DisruptorBuilder<T, F>
666where
667    F: Fn() -> T,
668{
669    /// Creates a new builder with the given factory.
670    pub fn new(factory: F) -> Self {
671        Self {
672            factory,
673            buffer_size: 1024,
674            wait_strategy: Arc::new(BusySpinWaitStrategy),
675            producer_type: ProducerType::Single,
676            _marker: std::marker::PhantomData,
677        }
678    }
679
680    /// Sets the buffer size.
681    pub fn buffer_size(mut self, size: usize) -> Self {
682        self.buffer_size = size;
683        self
684    }
685
686    /// Sets the wait strategy.
687    pub fn wait_strategy<W: WaitStrategy + 'static>(mut self, strategy: W) -> Self {
688        self.wait_strategy = Arc::new(strategy);
689        self
690    }
691
692    /// Sets the producer type to Single.
693    pub fn single_producer(mut self) -> Self {
694        self.producer_type = ProducerType::Single;
695        self
696    }
697
698    /// Sets the producer type to Multi.
699    pub fn multi_producer(mut self) -> Self {
700        self.producer_type = ProducerType::Multi;
701        self
702    }
703
704    /// Builds the Disruptor.
705    pub fn build(self) -> Disruptor<T> {
706        let sequencer: Arc<dyn Sequencer> = match self.producer_type {
707            ProducerType::Single => Arc::new(SingleProducerSequencer::new(
708                self.buffer_size,
709                self.wait_strategy.clone(),
710            )),
711            ProducerType::Multi => Arc::new(MultiProducerSequencer::new(
712                self.buffer_size,
713                self.wait_strategy.clone(),
714            )),
715        };
716
717        let ring_buffer = Arc::new(RingBuffer::new(self.factory, self.buffer_size, sequencer));
718        Disruptor {
719            ring_buffer,
720            processors: Vec::new(),
721            started: false,
722            wait_strategy: self.wait_strategy,
723        }
724    }
725}
726
727impl<T: Send + Sync + 'static> Disruptor<T> {
728    /// Creates a new builder for the Disruptor.
729    pub fn builder<F>(factory: F) -> DisruptorBuilder<T, F>
730    where
731        F: Fn() -> T,
732    {
733        DisruptorBuilder::new(factory)
734    }
735
736    /// Registers an event handler.
737    pub fn handle_events_with<H: EventHandler<T> + 'static>(&mut self, handler: H) -> &mut Self {
738        // Create a barrier that waits on the ring buffer's sequencer.
739        // Initially, the barrier depends on the sequencer itself (producers).
740
741        let barrier = Arc::new(ProcessingSequenceBarrier::new(
742            self.wait_strategy.clone(),
743            self.ring_buffer.sequencer.clone(), // Dependent on cursor (producer)
744            self.ring_buffer.sequencer.clone(),
745        ));
746
747        let processor = Arc::new(BatchEventProcessor::new(
748            self.ring_buffer.clone(),
749            barrier,
750            Arc::new(handler),
751        ));
752
753        self.processors.push(processor);
754        self
755    }
756
757    /// Starts the Disruptor and returns a Producer.
758    pub fn start(mut self) -> Producer<T> {
759        let mut gating_sequences = Vec::new();
760        for processor in &self.processors {
761            gating_sequences.push(processor.get_sequence());
762            let p = processor.clone();
763            thread::spawn(move || {
764                p.run();
765            });
766        }
767
768        // SAFETY: We have exclusive ownership and no publishers yet.
769        // Safe to mutate RingBuffer to add gating sequences.
770        // RingBuffer::add_gating_sequences is now safe (interior mutability in Sequencer)
771        self.ring_buffer.add_gating_sequences(gating_sequences);
772
773        self.started = true;
774
775        Producer {
776            ring_buffer: self.ring_buffer,
777        }
778    }
779}