tycho_block_util/state/
min_ref_mc_state.rs1use std::collections::hash_map;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU32, Ordering};
4
5use tycho_types::models::ShardStateUnsplit;
6use tycho_util::FastHashMap;
7
8const METRIC_MIN_REF_MC_SEQNO: &str = "tycho_min_ref_mc_seqno";
10
11#[derive(Clone, Default)]
12#[repr(transparent)]
13pub struct MinRefMcStateTracker {
14 inner: Arc<Inner>,
15}
16
17impl MinRefMcStateTracker {
18 #[inline]
19 pub fn new() -> Self {
20 Self {
21 inner: Arc::new(Inner::default()),
22 }
23 }
24
25 pub fn seqno(&self) -> Option<u32> {
26 self.inner.counters.read().min_seqno
27 }
28
29 pub fn insert(&self, state: &ShardStateUnsplit) -> RefMcStateHandle {
30 if state.seqno == 0 || state.min_ref_mc_seqno == u32::MAX {
31 self.insert_untracked()
35 } else {
36 self.insert_seqno(state.min_ref_mc_seqno)
37 }
38 }
39
40 pub fn insert_seqno(&self, mc_seqno: u32) -> RefMcStateHandle {
41 self.inner.insert(mc_seqno)
42 }
43
44 pub fn insert_untracked(&self) -> RefMcStateHandle {
45 RefMcStateHandle(Arc::new(HandleInner {
46 min_ref_mc_state: self.inner.clone(),
47 mc_seqno: None,
48 }))
49 }
50
51 #[inline]
52 fn wrap(inner: &Arc<Inner>) -> &Self {
53 unsafe { &*(inner as *const Arc<Inner>).cast::<Self>() }
55 }
56}
57
58#[derive(Clone)]
59#[repr(transparent)]
60pub struct RefMcStateHandle(Arc<HandleInner>);
61
62impl RefMcStateHandle {
63 pub fn min_safe<'a>(&'a self, other: &'a Self) -> &'a Self {
64 match (self.0.mc_seqno, other.0.mc_seqno) {
65 (_, None) => self,
67 (None, Some(_)) => other,
68 (Some(this_seqno), Some(other_seqno)) => {
70 if other_seqno < this_seqno {
71 other
72 } else {
73 self
74 }
75 }
76 }
77 }
78
79 pub fn tracker(&self) -> &MinRefMcStateTracker {
80 MinRefMcStateTracker::wrap(&self.0.min_ref_mc_state)
81 }
82}
83
84impl std::fmt::Debug for RefMcStateHandle {
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 f.debug_struct("RefMcStateHandle")
87 .field("mc_seqno", &self.0.mc_seqno)
88 .finish()
89 }
90}
91
92#[derive(Default)]
93struct Inner {
94 counters: parking_lot::RwLock<StateIds>,
95}
96
97impl Inner {
98 fn insert(self: &Arc<Self>, mc_seqno: u32) -> RefMcStateHandle {
99 let counters = self.counters.read();
101 if let Some(counter) = counters.refs.get(&mc_seqno) {
102 counter.fetch_add(1, Ordering::Release);
103 return RefMcStateHandle(Arc::new(HandleInner {
104 min_ref_mc_state: self.clone(),
105 mc_seqno: Some(mc_seqno),
106 }));
107 }
108 drop(counters);
109
110 let mut counters = self.counters.write();
112 match counters.refs.entry(mc_seqno) {
113 hash_map::Entry::Vacant(entry) => {
114 entry.insert(AtomicU32::new(1));
115
116 match &mut counters.min_seqno {
117 Some(seqno) if mc_seqno < *seqno => *seqno = mc_seqno,
118 None => counters.min_seqno = Some(mc_seqno),
119 _ => {}
120 }
121 }
122 hash_map::Entry::Occupied(entry) => {
123 entry.get().fetch_add(1, Ordering::Release);
124 }
125 }
126
127 let min_seqno = counters.min_seqno;
128 drop(counters);
129
130 metrics::gauge!(METRIC_MIN_REF_MC_SEQNO).set(min_seqno.unwrap_or_default());
131
132 RefMcStateHandle(Arc::new(HandleInner {
133 min_ref_mc_state: self.clone(),
134 mc_seqno: Some(mc_seqno),
135 }))
136 }
137
138 fn remove(&self, mc_seqno: u32) {
139 let counters = self.counters.read();
141 if let Some(counter) = counters.refs.get(&mc_seqno) {
142 if counter.fetch_sub(1, Ordering::AcqRel) > 1 {
143 return;
144 }
145 } else {
146 return;
147 }
148 drop(counters);
149
150 let mut counters = self.counters.write();
152 match counters.refs.entry(mc_seqno) {
153 hash_map::Entry::Occupied(entry) if entry.get().load(Ordering::Acquire) == 0 => {
154 entry.remove();
155 if matches!(counters.min_seqno, Some(seqno) if seqno == mc_seqno) {
156 counters.min_seqno = counters.refs.keys().min().copied();
157 }
158 }
159 _ => {}
160 }
161
162 let min_seqno = counters.min_seqno;
163 drop(counters);
164
165 metrics::gauge!(METRIC_MIN_REF_MC_SEQNO).set(min_seqno.unwrap_or_default());
166 }
167}
168
169struct HandleInner {
170 min_ref_mc_state: Arc<Inner>,
171 mc_seqno: Option<u32>,
172}
173
174impl Drop for HandleInner {
175 fn drop(&mut self) {
176 if let Some(mc_seqno) = self.mc_seqno {
177 self.min_ref_mc_state.remove(mc_seqno);
178 }
179 }
180}
181
182#[derive(Default)]
183struct StateIds {
184 min_seqno: Option<u32>,
185 refs: FastHashMap<u32, AtomicU32>,
186}
187
188#[cfg(test)]
189mod tests {
190 use super::*;
191
192 #[test]
193 fn min_ref_mc_state() {
194 let state = MinRefMcStateTracker::new();
195
196 {
197 let _handle = state.insert_seqno(10);
198 assert_eq!(state.seqno(), Some(10));
199 }
200 assert_eq!(state.seqno(), None);
201
202 {
203 let handle1 = state.insert_seqno(10);
204 assert_eq!(state.seqno(), Some(10));
205 let _handle2 = state.insert_seqno(15);
206 assert_eq!(state.seqno(), Some(10));
207 let handle3 = state.insert_seqno(10);
208 assert_eq!(state.seqno(), Some(10));
209 drop(handle3);
210 assert_eq!(state.seqno(), Some(10));
211 drop(handle1);
212 assert_eq!(state.seqno(), Some(15));
213 }
214 assert_eq!(state.seqno(), None);
215 }
216}