Skip to main content

tycho_block_util/state/
min_ref_mc_state.rs

1use std::collections::hash_map;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU32, Ordering};
4
5use tycho_types::models::ShardStateUnsplit;
6use tycho_util::FastHashMap;
7
8// Gauges
9const METRIC_MIN_REF_MC_SEQNO: &str = "tycho_min_ref_mc_seqno";
10
11#[derive(Clone, Default)]
12#[repr(transparent)]
13pub struct MinRefMcStateTracker {
14    inner: Arc<Inner>,
15}
16
17impl MinRefMcStateTracker {
18    #[inline]
19    pub fn new() -> Self {
20        Self {
21            inner: Arc::new(Inner::default()),
22        }
23    }
24
25    pub fn seqno(&self) -> Option<u32> {
26        self.inner.counters.read().min_seqno
27    }
28
29    pub fn insert(&self, state: &ShardStateUnsplit) -> RefMcStateHandle {
30        if state.seqno == 0 || state.min_ref_mc_seqno == u32::MAX {
31            // Insert zerostates as untracked states to prevent their cache
32            // to hold back the global archives GC. This handle will still
33            // point to a shared tracker, but will have not touch any ref.
34            self.insert_untracked()
35        } else {
36            self.insert_seqno(state.min_ref_mc_seqno)
37        }
38    }
39
40    pub fn insert_seqno(&self, mc_seqno: u32) -> RefMcStateHandle {
41        self.inner.insert(mc_seqno)
42    }
43
44    pub fn insert_untracked(&self) -> RefMcStateHandle {
45        RefMcStateHandle(Arc::new(HandleInner {
46            min_ref_mc_state: self.inner.clone(),
47            mc_seqno: None,
48        }))
49    }
50
51    #[inline]
52    fn wrap(inner: &Arc<Inner>) -> &Self {
53        // SAFETY: `MinRefMcStateTracker` has the same memory layout as `Arc<Inner>`.
54        unsafe { &*(inner as *const Arc<Inner>).cast::<Self>() }
55    }
56}
57
58#[derive(Clone)]
59#[repr(transparent)]
60pub struct RefMcStateHandle(Arc<HandleInner>);
61
62impl RefMcStateHandle {
63    pub fn min_safe<'a>(&'a self, other: &'a Self) -> &'a Self {
64        match (self.0.mc_seqno, other.0.mc_seqno) {
65            // Tracked seqno is safer.
66            (_, None) => self,
67            (None, Some(_)) => other,
68            // Lower seqno is safer.
69            (Some(this_seqno), Some(other_seqno)) => {
70                if other_seqno < this_seqno {
71                    other
72                } else {
73                    self
74                }
75            }
76        }
77    }
78
79    pub fn tracker(&self) -> &MinRefMcStateTracker {
80        MinRefMcStateTracker::wrap(&self.0.min_ref_mc_state)
81    }
82}
83
84impl std::fmt::Debug for RefMcStateHandle {
85    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        f.debug_struct("RefMcStateHandle")
87            .field("mc_seqno", &self.0.mc_seqno)
88            .finish()
89    }
90}
91
92#[derive(Default)]
93struct Inner {
94    counters: parking_lot::RwLock<StateIds>,
95}
96
97impl Inner {
98    fn insert(self: &Arc<Self>, mc_seqno: u32) -> RefMcStateHandle {
99        // Fast path, just increase existing counter
100        let counters = self.counters.read();
101        if let Some(counter) = counters.refs.get(&mc_seqno) {
102            counter.fetch_add(1, Ordering::Release);
103            return RefMcStateHandle(Arc::new(HandleInner {
104                min_ref_mc_state: self.clone(),
105                mc_seqno: Some(mc_seqno),
106            }));
107        }
108        drop(counters);
109
110        // Fallback to exclusive write
111        let mut counters = self.counters.write();
112        match counters.refs.entry(mc_seqno) {
113            hash_map::Entry::Vacant(entry) => {
114                entry.insert(AtomicU32::new(1));
115
116                match &mut counters.min_seqno {
117                    Some(seqno) if mc_seqno < *seqno => *seqno = mc_seqno,
118                    None => counters.min_seqno = Some(mc_seqno),
119                    _ => {}
120                }
121            }
122            hash_map::Entry::Occupied(entry) => {
123                entry.get().fetch_add(1, Ordering::Release);
124            }
125        }
126
127        let min_seqno = counters.min_seqno;
128        drop(counters);
129
130        metrics::gauge!(METRIC_MIN_REF_MC_SEQNO).set(min_seqno.unwrap_or_default());
131
132        RefMcStateHandle(Arc::new(HandleInner {
133            min_ref_mc_state: self.clone(),
134            mc_seqno: Some(mc_seqno),
135        }))
136    }
137
138    fn remove(&self, mc_seqno: u32) {
139        // Fast path, just decrease existing counter
140        let counters = self.counters.read();
141        if let Some(counter) = counters.refs.get(&mc_seqno) {
142            if counter.fetch_sub(1, Ordering::AcqRel) > 1 {
143                return;
144            }
145        } else {
146            return;
147        }
148        drop(counters);
149
150        // Fallback to exclusive write to update current min
151        let mut counters = self.counters.write();
152        match counters.refs.entry(mc_seqno) {
153            hash_map::Entry::Occupied(entry) if entry.get().load(Ordering::Acquire) == 0 => {
154                entry.remove();
155                if matches!(counters.min_seqno, Some(seqno) if seqno == mc_seqno) {
156                    counters.min_seqno = counters.refs.keys().min().copied();
157                }
158            }
159            _ => {}
160        }
161
162        let min_seqno = counters.min_seqno;
163        drop(counters);
164
165        metrics::gauge!(METRIC_MIN_REF_MC_SEQNO).set(min_seqno.unwrap_or_default());
166    }
167}
168
169struct HandleInner {
170    min_ref_mc_state: Arc<Inner>,
171    mc_seqno: Option<u32>,
172}
173
174impl Drop for HandleInner {
175    fn drop(&mut self) {
176        if let Some(mc_seqno) = self.mc_seqno {
177            self.min_ref_mc_state.remove(mc_seqno);
178        }
179    }
180}
181
182#[derive(Default)]
183struct StateIds {
184    min_seqno: Option<u32>,
185    refs: FastHashMap<u32, AtomicU32>,
186}
187
188#[cfg(test)]
189mod tests {
190    use super::*;
191
192    #[test]
193    fn min_ref_mc_state() {
194        let state = MinRefMcStateTracker::new();
195
196        {
197            let _handle = state.insert_seqno(10);
198            assert_eq!(state.seqno(), Some(10));
199        }
200        assert_eq!(state.seqno(), None);
201
202        {
203            let handle1 = state.insert_seqno(10);
204            assert_eq!(state.seqno(), Some(10));
205            let _handle2 = state.insert_seqno(15);
206            assert_eq!(state.seqno(), Some(10));
207            let handle3 = state.insert_seqno(10);
208            assert_eq!(state.seqno(), Some(10));
209            drop(handle3);
210            assert_eq!(state.seqno(), Some(10));
211            drop(handle1);
212            assert_eq!(state.seqno(), Some(15));
213        }
214        assert_eq!(state.seqno(), None);
215    }
216}