1use std::sync::atomic::{AtomicU64, Ordering};
7use std::time::{Duration, Instant};
8
9use dashmap::DashMap;
10
11pub type SessionId = String;
13
14#[derive(Debug)]
16pub struct RywSession {
17 pub last_write_lsn: AtomicU64,
19
20 pub written_at: Instant,
22
23 pub write_count: AtomicU64,
25}
26
27impl RywSession {
28 fn new(lsn: u64) -> Self {
29 Self {
30 last_write_lsn: AtomicU64::new(lsn),
31 written_at: Instant::now(),
32 write_count: AtomicU64::new(1),
33 }
34 }
35
36 fn update(&self, lsn: u64) {
37 let current = self.last_write_lsn.load(Ordering::Relaxed);
39 if lsn > current {
40 self.last_write_lsn.store(lsn, Ordering::Relaxed);
41 }
42 self.write_count.fetch_add(1, Ordering::Relaxed);
43 }
44
45 fn is_expired(&self, retention: Duration) -> bool {
46 self.written_at.elapsed() > retention
47 }
48
49 fn get_lsn(&self) -> u64 {
50 self.last_write_lsn.load(Ordering::Relaxed)
51 }
52}
53
54pub struct ReadYourWritesTracker {
59 sessions: DashMap<SessionId, RywSession>,
61
62 retention: Duration,
64
65 last_cleanup: parking_lot::Mutex<Instant>,
67
68 cleanup_interval: Duration,
70}
71
72impl ReadYourWritesTracker {
73 pub fn new(retention: Duration) -> Self {
75 Self {
76 sessions: DashMap::new(),
77 retention,
78 last_cleanup: parking_lot::Mutex::new(Instant::now()),
79 cleanup_interval: Duration::from_secs(60),
80 }
81 }
82
83 pub fn with_defaults() -> Self {
85 Self::new(Duration::from_secs(300))
86 }
87
88 pub fn record_write(&self, session_id: &str, lsn: u64) {
90 self.maybe_cleanup();
91
92 self.sessions
93 .entry(session_id.to_string())
94 .and_modify(|session| session.update(lsn))
95 .or_insert_with(|| RywSession::new(lsn));
96 }
97
98 pub fn get_required_lsn(&self, session_id: &str) -> Option<u64> {
102 self.sessions.get(session_id).and_then(|session| {
103 if session.is_expired(self.retention) {
104 None
105 } else {
106 Some(session.get_lsn())
107 }
108 })
109 }
110
111 pub fn has_requirement(&self, session_id: &str) -> bool {
113 self.get_required_lsn(session_id).is_some()
114 }
115
116 pub fn clear(&self, session_id: &str) {
118 self.sessions.remove(session_id);
119 }
120
121 pub fn clear_session(&self, session_id: &str) {
123 self.sessions.remove(session_id);
124 }
125
126 pub fn session_count(&self) -> usize {
128 self.sessions.len()
129 }
130
131 pub fn get_session_info(&self, session_id: &str) -> Option<(u64, Duration, u64)> {
133 self.sessions.get(session_id).map(|session| {
134 (
135 session.get_lsn(),
136 session.written_at.elapsed(),
137 session.write_count.load(Ordering::Relaxed),
138 )
139 })
140 }
141
142 fn maybe_cleanup(&self) {
144 let mut last_cleanup = self.last_cleanup.lock();
145 if last_cleanup.elapsed() < self.cleanup_interval {
146 return;
147 }
148
149 self.sessions
151 .retain(|_, session| !session.is_expired(self.retention));
152
153 *last_cleanup = Instant::now();
154 }
155
156 pub fn cleanup(&self) {
158 self.sessions
159 .retain(|_, session| !session.is_expired(self.retention));
160 *self.last_cleanup.lock() = Instant::now();
161 }
162
163 pub fn clear_all(&self) {
165 self.sessions.clear();
166 }
167}
168
169impl std::fmt::Debug for ReadYourWritesTracker {
170 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171 f.debug_struct("ReadYourWritesTracker")
172 .field("session_count", &self.sessions.len())
173 .field("retention", &self.retention)
174 .finish()
175 }
176}
177
178#[derive(Debug)]
183pub struct WorkflowConsistency {
184 workflow_id: String,
186
187 start_lsn: u64,
189
190 consistency_point: AtomicU64,
192
193 started_at: Instant,
195
196 step_count: AtomicU64,
198}
199
200impl WorkflowConsistency {
201 pub fn begin(workflow_id: &str, current_lsn: u64) -> Self {
203 Self {
204 workflow_id: workflow_id.to_string(),
205 start_lsn: current_lsn,
206 consistency_point: AtomicU64::new(current_lsn),
207 started_at: Instant::now(),
208 step_count: AtomicU64::new(0),
209 }
210 }
211
212 pub fn record_write(&self, write_lsn: u64) {
214 let current = self.consistency_point.load(Ordering::Relaxed);
216 if write_lsn > current {
217 self.consistency_point.store(write_lsn, Ordering::Relaxed);
218 }
219 self.step_count.fetch_add(1, Ordering::Relaxed);
220 }
221
222 pub fn get_read_lsn_requirement(&self) -> u64 {
224 self.consistency_point.load(Ordering::Relaxed)
225 }
226
227 pub fn workflow_id(&self) -> &str {
229 &self.workflow_id
230 }
231
232 pub fn start_lsn(&self) -> u64 {
234 self.start_lsn
235 }
236
237 pub fn duration(&self) -> Duration {
239 self.started_at.elapsed()
240 }
241
242 pub fn step_count(&self) -> u64 {
244 self.step_count.load(Ordering::Relaxed)
245 }
246
247 pub fn has_writes(&self) -> bool {
249 self.consistency_point.load(Ordering::Relaxed) > self.start_lsn
250 }
251}
252
253pub struct WorkflowTracker {
255 workflows: DashMap<String, WorkflowConsistency>,
256 max_age: Duration,
257}
258
259impl WorkflowTracker {
260 pub fn new(max_age: Duration) -> Self {
262 Self {
263 workflows: DashMap::new(),
264 max_age,
265 }
266 }
267
268 pub fn begin_workflow(&self, workflow_id: &str, current_lsn: u64) {
270 self.workflows.insert(
271 workflow_id.to_string(),
272 WorkflowConsistency::begin(workflow_id, current_lsn),
273 );
274 }
275
276 pub fn record_write(&self, workflow_id: &str, write_lsn: u64) {
278 if let Some(workflow) = self.workflows.get(workflow_id) {
279 workflow.record_write(write_lsn);
280 }
281 }
282
283 pub fn get_read_requirement(&self, workflow_id: &str) -> Option<u64> {
285 self.workflows
286 .get(workflow_id)
287 .map(|w| w.get_read_lsn_requirement())
288 }
289
290 pub fn end_workflow(&self, workflow_id: &str) {
292 self.workflows.remove(workflow_id);
293 }
294
295 pub fn cleanup(&self) {
297 self.workflows
298 .retain(|_, workflow| workflow.duration() < self.max_age);
299 }
300
301 pub fn workflow_count(&self) -> usize {
303 self.workflows.len()
304 }
305}
306
307impl Default for WorkflowTracker {
308 fn default() -> Self {
309 Self::new(Duration::from_secs(3600)) }
311}
312
313#[cfg(test)]
314mod tests {
315 use super::*;
316
317 #[test]
318 fn test_ryw_tracker_basic() {
319 let tracker = ReadYourWritesTracker::with_defaults();
320
321 tracker.record_write("session-1", 1000);
323
324 assert!(tracker.has_requirement("session-1"));
326 assert_eq!(tracker.get_required_lsn("session-1"), Some(1000));
327 }
328
329 #[test]
330 fn test_ryw_tracker_updates() {
331 let tracker = ReadYourWritesTracker::with_defaults();
332
333 tracker.record_write("session-1", 1000);
334 tracker.record_write("session-1", 1500);
335
336 assert_eq!(tracker.get_required_lsn("session-1"), Some(1500));
338
339 tracker.record_write("session-1", 1200);
341 assert_eq!(tracker.get_required_lsn("session-1"), Some(1500));
342 }
343
344 #[test]
345 fn test_ryw_tracker_clear() {
346 let tracker = ReadYourWritesTracker::with_defaults();
347
348 tracker.record_write("session-1", 1000);
349 assert!(tracker.has_requirement("session-1"));
350
351 tracker.clear("session-1");
352 assert!(!tracker.has_requirement("session-1"));
353 assert_eq!(tracker.get_required_lsn("session-1"), None);
354 }
355
356 #[test]
357 fn test_ryw_tracker_multiple_sessions() {
358 let tracker = ReadYourWritesTracker::with_defaults();
359
360 tracker.record_write("session-1", 1000);
361 tracker.record_write("session-2", 2000);
362 tracker.record_write("session-3", 500);
363
364 assert_eq!(tracker.session_count(), 3);
365 assert_eq!(tracker.get_required_lsn("session-1"), Some(1000));
366 assert_eq!(tracker.get_required_lsn("session-2"), Some(2000));
367 assert_eq!(tracker.get_required_lsn("session-3"), Some(500));
368 }
369
370 #[test]
371 fn test_ryw_session_expiry() {
372 let tracker = ReadYourWritesTracker::new(Duration::from_millis(50));
374
375 tracker.record_write("session-1", 1000);
376 assert!(tracker.has_requirement("session-1"));
377
378 std::thread::sleep(Duration::from_millis(100));
380
381 assert!(!tracker.has_requirement("session-1"));
383 }
384
385 #[test]
386 fn test_workflow_consistency_basic() {
387 let workflow = WorkflowConsistency::begin("wf-1", 1000);
388
389 assert_eq!(workflow.workflow_id(), "wf-1");
390 assert_eq!(workflow.start_lsn(), 1000);
391 assert_eq!(workflow.get_read_lsn_requirement(), 1000);
392 assert!(!workflow.has_writes());
393 }
394
395 #[test]
396 fn test_workflow_consistency_writes() {
397 let workflow = WorkflowConsistency::begin("wf-1", 1000);
398
399 workflow.record_write(1500);
400 assert!(workflow.has_writes());
401 assert_eq!(workflow.get_read_lsn_requirement(), 1500);
402
403 workflow.record_write(2000);
404 assert_eq!(workflow.get_read_lsn_requirement(), 2000);
405
406 workflow.record_write(1800);
408 assert_eq!(workflow.get_read_lsn_requirement(), 2000);
409 }
410
411 #[test]
412 fn test_workflow_tracker() {
413 let tracker = WorkflowTracker::new(Duration::from_secs(60));
414
415 tracker.begin_workflow("wf-1", 1000);
416 tracker.begin_workflow("wf-2", 2000);
417
418 assert_eq!(tracker.workflow_count(), 2);
419
420 tracker.record_write("wf-1", 1500);
421 assert_eq!(tracker.get_read_requirement("wf-1"), Some(1500));
422
423 tracker.end_workflow("wf-1");
424 assert_eq!(tracker.workflow_count(), 1);
425 assert_eq!(tracker.get_read_requirement("wf-1"), None);
426 }
427
428 #[test]
429 fn test_ryw_session_info() {
430 let tracker = ReadYourWritesTracker::with_defaults();
431
432 tracker.record_write("session-1", 1000);
433 tracker.record_write("session-1", 1500);
434 tracker.record_write("session-1", 2000);
435
436 let (lsn, age, count) = tracker.get_session_info("session-1").unwrap();
437 assert_eq!(lsn, 2000);
438 assert!(age < Duration::from_secs(1));
439 assert_eq!(count, 3);
440 }
441}