Skip to main content

heliosdb_proxy/lag/
ryw.rs

1//! Read-Your-Writes (RYW) Consistency Tracker
2//!
3//! Tracks write LSNs per session to ensure subsequent reads
4//! can see the session's own writes.
5
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::time::{Duration, Instant};
8
9use dashmap::DashMap;
10
11/// Session ID type
12pub type SessionId = String;
13
14/// Session RYW data
15#[derive(Debug)]
16pub struct RywSession {
17    /// Last write LSN for this session
18    pub last_write_lsn: AtomicU64,
19
20    /// When the LSN was recorded
21    pub written_at: Instant,
22
23    /// Number of writes tracked
24    pub write_count: AtomicU64,
25}
26
27impl RywSession {
28    fn new(lsn: u64) -> Self {
29        Self {
30            last_write_lsn: AtomicU64::new(lsn),
31            written_at: Instant::now(),
32            write_count: AtomicU64::new(1),
33        }
34    }
35
36    fn update(&self, lsn: u64) {
37        // Only update if new LSN is higher
38        let current = self.last_write_lsn.load(Ordering::Relaxed);
39        if lsn > current {
40            self.last_write_lsn.store(lsn, Ordering::Relaxed);
41        }
42        self.write_count.fetch_add(1, Ordering::Relaxed);
43    }
44
45    fn is_expired(&self, retention: Duration) -> bool {
46        self.written_at.elapsed() > retention
47    }
48
49    fn get_lsn(&self) -> u64 {
50        self.last_write_lsn.load(Ordering::Relaxed)
51    }
52}
53
54/// Read-Your-Writes Tracker
55///
56/// Tracks the last write LSN per session to ensure reads
57/// are routed to replicas that have replayed past that point.
58pub struct ReadYourWritesTracker {
59    /// Session -> RYW data
60    sessions: DashMap<SessionId, RywSession>,
61
62    /// How long to retain LSN requirements
63    retention: Duration,
64
65    /// Last cleanup time
66    last_cleanup: parking_lot::Mutex<Instant>,
67
68    /// Cleanup interval
69    cleanup_interval: Duration,
70}
71
72impl ReadYourWritesTracker {
73    /// Create a new RYW tracker
74    pub fn new(retention: Duration) -> Self {
75        Self {
76            sessions: DashMap::new(),
77            retention,
78            last_cleanup: parking_lot::Mutex::new(Instant::now()),
79            cleanup_interval: Duration::from_secs(60),
80        }
81    }
82
83    /// Create with default retention (5 minutes)
84    pub fn with_defaults() -> Self {
85        Self::new(Duration::from_secs(300))
86    }
87
88    /// Record that a session wrote at this LSN
89    pub fn record_write(&self, session_id: &str, lsn: u64) {
90        self.maybe_cleanup();
91
92        self.sessions
93            .entry(session_id.to_string())
94            .and_modify(|session| session.update(lsn))
95            .or_insert_with(|| RywSession::new(lsn));
96    }
97
98    /// Get the required LSN for read-your-writes
99    ///
100    /// Returns None if no writes recorded or requirement expired
101    pub fn get_required_lsn(&self, session_id: &str) -> Option<u64> {
102        self.sessions.get(session_id).and_then(|session| {
103            if session.is_expired(self.retention) {
104                None
105            } else {
106                Some(session.get_lsn())
107            }
108        })
109    }
110
111    /// Check if a session has a pending RYW requirement
112    pub fn has_requirement(&self, session_id: &str) -> bool {
113        self.get_required_lsn(session_id).is_some()
114    }
115
116    /// Clear LSN requirement (after successful read that satisfied it)
117    pub fn clear(&self, session_id: &str) {
118        self.sessions.remove(session_id);
119    }
120
121    /// Clear all requirements for a session
122    pub fn clear_session(&self, session_id: &str) {
123        self.sessions.remove(session_id);
124    }
125
126    /// Get number of tracked sessions
127    pub fn session_count(&self) -> usize {
128        self.sessions.len()
129    }
130
131    /// Get session info
132    pub fn get_session_info(&self, session_id: &str) -> Option<(u64, Duration, u64)> {
133        self.sessions.get(session_id).map(|session| {
134            (
135                session.get_lsn(),
136                session.written_at.elapsed(),
137                session.write_count.load(Ordering::Relaxed),
138            )
139        })
140    }
141
142    /// Perform cleanup of expired sessions (if due)
143    fn maybe_cleanup(&self) {
144        let mut last_cleanup = self.last_cleanup.lock();
145        if last_cleanup.elapsed() < self.cleanup_interval {
146            return;
147        }
148
149        // Remove expired sessions
150        self.sessions
151            .retain(|_, session| !session.is_expired(self.retention));
152
153        *last_cleanup = Instant::now();
154    }
155
156    /// Force cleanup now
157    pub fn cleanup(&self) {
158        self.sessions
159            .retain(|_, session| !session.is_expired(self.retention));
160        *self.last_cleanup.lock() = Instant::now();
161    }
162
163    /// Clear all tracked sessions
164    pub fn clear_all(&self) {
165        self.sessions.clear();
166    }
167}
168
169impl std::fmt::Debug for ReadYourWritesTracker {
170    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171        f.debug_struct("ReadYourWritesTracker")
172            .field("session_count", &self.sessions.len())
173            .field("retention", &self.retention)
174            .finish()
175    }
176}
177
178/// Workflow Consistency Tracker
179///
180/// For multi-step workflows that need coordinated consistency
181/// across multiple operations.
182#[derive(Debug)]
183pub struct WorkflowConsistency {
184    /// Workflow ID
185    workflow_id: String,
186
187    /// LSN at workflow start
188    start_lsn: u64,
189
190    /// Current consistency point (highest write LSN)
191    consistency_point: AtomicU64,
192
193    /// When the workflow started
194    started_at: Instant,
195
196    /// Step counter
197    step_count: AtomicU64,
198}
199
200impl WorkflowConsistency {
201    /// Begin a new workflow
202    pub fn begin(workflow_id: &str, current_lsn: u64) -> Self {
203        Self {
204            workflow_id: workflow_id.to_string(),
205            start_lsn: current_lsn,
206            consistency_point: AtomicU64::new(current_lsn),
207            started_at: Instant::now(),
208            step_count: AtomicU64::new(0),
209        }
210    }
211
212    /// Record a write in this workflow
213    pub fn record_write(&self, write_lsn: u64) {
214        // Update consistency point to max of current and new LSN
215        let current = self.consistency_point.load(Ordering::Relaxed);
216        if write_lsn > current {
217            self.consistency_point.store(write_lsn, Ordering::Relaxed);
218        }
219        self.step_count.fetch_add(1, Ordering::Relaxed);
220    }
221
222    /// Get the required LSN for consistent reads in this workflow
223    pub fn get_read_lsn_requirement(&self) -> u64 {
224        self.consistency_point.load(Ordering::Relaxed)
225    }
226
227    /// Get workflow ID
228    pub fn workflow_id(&self) -> &str {
229        &self.workflow_id
230    }
231
232    /// Get start LSN
233    pub fn start_lsn(&self) -> u64 {
234        self.start_lsn
235    }
236
237    /// Get workflow duration
238    pub fn duration(&self) -> Duration {
239        self.started_at.elapsed()
240    }
241
242    /// Get step count
243    pub fn step_count(&self) -> u64 {
244        self.step_count.load(Ordering::Relaxed)
245    }
246
247    /// Check if workflow has advanced (writes occurred)
248    pub fn has_writes(&self) -> bool {
249        self.consistency_point.load(Ordering::Relaxed) > self.start_lsn
250    }
251}
252
253/// Multi-workflow tracker
254pub struct WorkflowTracker {
255    workflows: DashMap<String, WorkflowConsistency>,
256    max_age: Duration,
257}
258
259impl WorkflowTracker {
260    /// Create a new workflow tracker
261    pub fn new(max_age: Duration) -> Self {
262        Self {
263            workflows: DashMap::new(),
264            max_age,
265        }
266    }
267
268    /// Begin a new workflow
269    pub fn begin_workflow(&self, workflow_id: &str, current_lsn: u64) {
270        self.workflows.insert(
271            workflow_id.to_string(),
272            WorkflowConsistency::begin(workflow_id, current_lsn),
273        );
274    }
275
276    /// Record a write in a workflow
277    pub fn record_write(&self, workflow_id: &str, write_lsn: u64) {
278        if let Some(workflow) = self.workflows.get(workflow_id) {
279            workflow.record_write(write_lsn);
280        }
281    }
282
283    /// Get read LSN requirement for a workflow
284    pub fn get_read_requirement(&self, workflow_id: &str) -> Option<u64> {
285        self.workflows
286            .get(workflow_id)
287            .map(|w| w.get_read_lsn_requirement())
288    }
289
290    /// End a workflow
291    pub fn end_workflow(&self, workflow_id: &str) {
292        self.workflows.remove(workflow_id);
293    }
294
295    /// Cleanup expired workflows
296    pub fn cleanup(&self) {
297        self.workflows
298            .retain(|_, workflow| workflow.duration() < self.max_age);
299    }
300
301    /// Get active workflow count
302    pub fn workflow_count(&self) -> usize {
303        self.workflows.len()
304    }
305}
306
307impl Default for WorkflowTracker {
308    fn default() -> Self {
309        Self::new(Duration::from_secs(3600)) // 1 hour default
310    }
311}
312
313#[cfg(test)]
314mod tests {
315    use super::*;
316
317    #[test]
318    fn test_ryw_tracker_basic() {
319        let tracker = ReadYourWritesTracker::with_defaults();
320
321        // Record a write
322        tracker.record_write("session-1", 1000);
323
324        // Should have requirement
325        assert!(tracker.has_requirement("session-1"));
326        assert_eq!(tracker.get_required_lsn("session-1"), Some(1000));
327    }
328
329    #[test]
330    fn test_ryw_tracker_updates() {
331        let tracker = ReadYourWritesTracker::with_defaults();
332
333        tracker.record_write("session-1", 1000);
334        tracker.record_write("session-1", 1500);
335
336        // Should use higher LSN
337        assert_eq!(tracker.get_required_lsn("session-1"), Some(1500));
338
339        // Lower LSN shouldn't update
340        tracker.record_write("session-1", 1200);
341        assert_eq!(tracker.get_required_lsn("session-1"), Some(1500));
342    }
343
344    #[test]
345    fn test_ryw_tracker_clear() {
346        let tracker = ReadYourWritesTracker::with_defaults();
347
348        tracker.record_write("session-1", 1000);
349        assert!(tracker.has_requirement("session-1"));
350
351        tracker.clear("session-1");
352        assert!(!tracker.has_requirement("session-1"));
353        assert_eq!(tracker.get_required_lsn("session-1"), None);
354    }
355
356    #[test]
357    fn test_ryw_tracker_multiple_sessions() {
358        let tracker = ReadYourWritesTracker::with_defaults();
359
360        tracker.record_write("session-1", 1000);
361        tracker.record_write("session-2", 2000);
362        tracker.record_write("session-3", 500);
363
364        assert_eq!(tracker.session_count(), 3);
365        assert_eq!(tracker.get_required_lsn("session-1"), Some(1000));
366        assert_eq!(tracker.get_required_lsn("session-2"), Some(2000));
367        assert_eq!(tracker.get_required_lsn("session-3"), Some(500));
368    }
369
370    #[test]
371    fn test_ryw_session_expiry() {
372        // Use very short retention for test
373        let tracker = ReadYourWritesTracker::new(Duration::from_millis(50));
374
375        tracker.record_write("session-1", 1000);
376        assert!(tracker.has_requirement("session-1"));
377
378        // Wait for expiry
379        std::thread::sleep(Duration::from_millis(100));
380
381        // Should be expired
382        assert!(!tracker.has_requirement("session-1"));
383    }
384
385    #[test]
386    fn test_workflow_consistency_basic() {
387        let workflow = WorkflowConsistency::begin("wf-1", 1000);
388
389        assert_eq!(workflow.workflow_id(), "wf-1");
390        assert_eq!(workflow.start_lsn(), 1000);
391        assert_eq!(workflow.get_read_lsn_requirement(), 1000);
392        assert!(!workflow.has_writes());
393    }
394
395    #[test]
396    fn test_workflow_consistency_writes() {
397        let workflow = WorkflowConsistency::begin("wf-1", 1000);
398
399        workflow.record_write(1500);
400        assert!(workflow.has_writes());
401        assert_eq!(workflow.get_read_lsn_requirement(), 1500);
402
403        workflow.record_write(2000);
404        assert_eq!(workflow.get_read_lsn_requirement(), 2000);
405
406        // Lower write shouldn't update
407        workflow.record_write(1800);
408        assert_eq!(workflow.get_read_lsn_requirement(), 2000);
409    }
410
411    #[test]
412    fn test_workflow_tracker() {
413        let tracker = WorkflowTracker::new(Duration::from_secs(60));
414
415        tracker.begin_workflow("wf-1", 1000);
416        tracker.begin_workflow("wf-2", 2000);
417
418        assert_eq!(tracker.workflow_count(), 2);
419
420        tracker.record_write("wf-1", 1500);
421        assert_eq!(tracker.get_read_requirement("wf-1"), Some(1500));
422
423        tracker.end_workflow("wf-1");
424        assert_eq!(tracker.workflow_count(), 1);
425        assert_eq!(tracker.get_read_requirement("wf-1"), None);
426    }
427
428    #[test]
429    fn test_ryw_session_info() {
430        let tracker = ReadYourWritesTracker::with_defaults();
431
432        tracker.record_write("session-1", 1000);
433        tracker.record_write("session-1", 1500);
434        tracker.record_write("session-1", 2000);
435
436        let (lsn, age, count) = tracker.get_session_info("session-1").unwrap();
437        assert_eq!(lsn, 2000);
438        assert!(age < Duration::from_secs(1));
439        assert_eq!(count, 3);
440    }
441}