1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
//! shared state between MPSC producers and consumer.
use crate::ringbuffer::RingBuffer;
use cpu::{CachePadded, Cursor, fence_acquire, fence_release};
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
/// number of bits per availability word (u64 = 64 bits).
pub(super) const BITS_PER_WORD: usize = 64;
/// shared state between producers and consumer.
pub(super) struct Shared<T> {
/// the ring buffer storing events.
pub(super) buffer: RingBuffer<T>,
/// claim cursor - next sequence to be claimed by a producer.
pub(super) claim_cursor: Cursor,
/// consumer's processed sequence (what's been consumed).
pub(super) consumer_cursor: Cursor,
/// availability bitmap tracking which sequences have been published.
/// sized to match capacity: one bit per slot in the ring buffer.
/// consumer scans this directly to find available sequences.
pub(super) availability: Box<[CachePadded<AtomicU64>]>,
/// number of words in the availability bitmap.
pub(super) availability_words: usize,
/// number of active producers.
pub(super) producer_count: CachePadded<AtomicUsize>,
/// channel closed flag.
pub(super) closed: CachePadded<AtomicBool>,
/// buffer capacity (power of 2).
pub(super) capacity: usize,
}
impl<T> Drop for Shared<T> {
fn drop(&mut self) {
// drop any unconsumed events to prevent memory leaks for types with Drop.
// scan the bitmap to find published but unconsumed sequences.
let consumer_seq = self.consumer_cursor.value();
let claim_seq = self.claim_cursor.value();
// only drop if there might be unconsumed events
if claim_seq > consumer_seq {
for seq in (consumer_seq + 1)..=claim_seq {
// only drop if this sequence was actually published
if self.is_published(seq) {
// safety: this slot was written and published but not yet consumed.
// we have exclusive access since we are in Drop.
unsafe {
let ptr = self.buffer.get_ptr_mut(seq);
core::ptr::drop_in_place(ptr);
}
}
}
}
}
}
impl<T> Shared<T> {
/// calculate word index for a sequence.
#[inline]
pub(super) fn word_index(&self, sequence: i64) -> usize {
((sequence as usize) / BITS_PER_WORD) % self.availability_words
}
/// calculate bit index within a word for a sequence.
#[inline]
pub(super) fn bit_index(&self, sequence: i64) -> usize {
(sequence as usize) % BITS_PER_WORD
}
/// check if a sequence is published.
/// uses Relaxed load, caller must fence before reading data.
#[inline]
pub(super) fn is_published(&self, sequence: i64) -> bool {
let word_idx = self.word_index(sequence);
let bit_idx = self.bit_index(sequence);
// use Relaxed - fence_acquire() called after this returns true
let word = self.availability[word_idx].load(Ordering::Relaxed);
let expected_bit = ((sequence as usize) / self.capacity) & 1;
let actual_bit = ((word >> bit_idx) & 1) as usize;
actual_bit == expected_bit
}
/// mark a sequence as published.
#[inline]
pub(super) fn mark_published(&self, sequence: i64) {
let word_idx = self.word_index(sequence);
let bit_idx = self.bit_index(sequence);
let mask = 1u64 << bit_idx;
// architecture-aware fence + Relaxed for better performance on x86
// on x86 TSO: fence_release() is just a compiler fence (~0 cycles)
// on ARM: fence_release() is a real fence (required for correctness)
fence_release();
self.availability[word_idx].fetch_xor(mask, Ordering::Relaxed);
}
/// mark multiple consecutive sequences as published using batched XOR.
///
/// this is more efficient than multiple `mark_published` calls because
/// it accumulates bits for the same word and uses a single atomic XOR.
///
/// # note
///
/// uses Relaxed ordering because callers must issue a `fence(Release)` before
/// calling this method to ensure data visibility. this saves ~5-10 cycles
/// per atomic operation compared to using Release ordering.
#[inline]
pub(super) fn mark_published_batch(&self, start: i64, count: usize) {
if count == 0 {
return;
}
let mut flip_mask = 0u64;
let mut current_word_idx = self.word_index(start);
for i in 0..count {
let seq = start + i as i64;
let word_idx = self.word_index(seq);
let bit_idx = self.bit_index(seq);
// if we are moving to a different word, flush accumulated bits
if word_idx != current_word_idx {
if flip_mask != 0 {
// relaxed is safe: caller issued fence(Release) before this call
self.availability[current_word_idx].fetch_xor(flip_mask, Ordering::Relaxed);
}
flip_mask = 0;
current_word_idx = word_idx;
}
flip_mask |= 1u64 << bit_idx;
}
// flush remaining bits
if flip_mask != 0 {
// relaxed is safe: caller issued fence(Release) before this call
self.availability[current_word_idx].fetch_xor(flip_mask, Ordering::Relaxed);
}
}
/// calculate the expected bit pattern for a sequence's round.
///
/// returns 0 for even rounds, !0 (all 1s) for odd rounds.
/// round is based on capacity (ring buffer wrap), not bitmap size.
#[inline]
fn expected_pattern(&self, sequence: i64) -> u64 {
let round = (sequence as usize) / self.capacity;
if round & 1 == 0 { 0 } else { !0u64 }
}
/// find the highest contiguously published sequence from a starting point.
///
/// uses word-level bit scanning for efficiency - processes up to 64 sequences
/// per iteration instead of one at a time.
///
/// fixme: uses relaxed loads for scanning, single acquire fence at end.
/// reduces O(words) acquire operations to O(1).
pub(super) fn highest_published(&self, from: i64) -> i64 {
let claim = self.claim_cursor.value_relaxed();
if from > claim {
return from - 1; // nothing claimed yet
}
let mut seq = from;
let mut word_idx = self.word_index(seq);
let mut bit_idx = self.bit_index(seq);
let result = loop {
if seq > claim {
break seq - 1;
}
// relaxed for scanning, fence after loop
let word = self.availability[word_idx].load(Ordering::Relaxed);
let expected = self.expected_pattern(seq);
// XOR with expected: published bits become 0, unpublished become 1
let mismatches = word ^ expected;
// mask off bits before our starting position in this word
let mask = !0u64 << bit_idx;
let relevant_mismatches = mismatches & mask;
if relevant_mismatches != 0 {
// found an unpublished sequence - use trailing_zeros to find it
let first_mismatch_bit = relevant_mismatches.trailing_zeros() as usize;
let advance = first_mismatch_bit - bit_idx;
break seq + advance as i64 - 1;
}
// all bits in this word (from bit_idx onwards) are published
// advance to the next word
let bits_checked = BITS_PER_WORD - bit_idx;
seq += bits_checked as i64;
word_idx = (word_idx + 1) % self.availability_words;
bit_idx = 0;
};
// single acquire fence ensures we see all buffer writes for published sequences
fence_acquire();
result
}
}