1use std::{
2 panic,
3 collections::HashMap,
4 sync::{
5 Arc, Mutex, Weak,
6 atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering},
7 },
8 thread,
9 time::{Duration, Instant},
10};
11
12pub struct TickHandlerRegistration {
18 tick_service: Weak<TickService>,
19 handler_id: u64,
20}
21
22impl TickHandlerRegistration {
23 fn new(tick_service: &Arc<TickService>, handler_id: u64) -> Self {
24 Self {
25 tick_service: Arc::downgrade(tick_service),
26 handler_id,
27 }
28 }
29
30}
31
32impl Drop for TickHandlerRegistration {
33 fn drop(&mut self) {
34 if let Some(service) = self.tick_service.upgrade() {
35 service.unregister(self.handler_id);
36 }
37 }
38}
39
40unsafe impl Send for TickHandlerRegistration {}
42unsafe impl Sync for TickHandlerRegistration {}
43
44pub trait TickHandler: Send + Sync {
46 fn tick_duration(&self) -> Duration;
49
50 fn on_tick(&self, tick_count: u64, now_ns: u64);
52
53 fn on_shutdown(&self);
55}
56
57struct TickHandlerState {
59 handler: Weak<dyn TickHandler>,
62 handler_id: u64,
64 tick_interval: u64,
67 handler_tick_count: u64,
69 stats: TickStats,
71}
72
73#[derive(Debug)]
75pub struct TickStats {
76 pub avg_tick_loop_duration_ns: AtomicU64,
78 pub avg_tick_loop_sleep_ns: AtomicU64,
80 pub max_drift_ns: AtomicU64,
82 pub total_ticks: AtomicU64,
84 pub total_drift_ns: AtomicI64,
86 pub missed_ticks: AtomicU64,
88 pub last_tick_ns: AtomicU64,
90}
91
92impl Default for TickStats {
93 fn default() -> Self {
94 Self {
95 avg_tick_loop_duration_ns: AtomicU64::new(0),
96 avg_tick_loop_sleep_ns: AtomicU64::new(0),
97 max_drift_ns: AtomicU64::new(0),
98 total_ticks: AtomicU64::new(0),
99 total_drift_ns: AtomicI64::new(0),
100 missed_ticks: AtomicU64::new(0),
101 last_tick_ns: AtomicU64::new(0),
102 }
103 }
104}
105
106impl TickStats {
107 pub fn snapshot(&self) -> TickStatsSnapshot {
109 TickStatsSnapshot {
110 avg_tick_loop_duration_ns: self.avg_tick_loop_duration_ns.load(Ordering::Relaxed),
111 avg_tick_loop_sleep_ns: self.avg_tick_loop_sleep_ns.load(Ordering::Relaxed),
112 max_drift_ns: self.max_drift_ns.load(Ordering::Relaxed) as i64,
113 total_ticks: self.total_ticks.load(Ordering::Relaxed),
114 total_drift_ns: self.total_drift_ns.load(Ordering::Relaxed),
115 missed_ticks: self.missed_ticks.load(Ordering::Relaxed),
116 last_tick_ns: self.last_tick_ns.load(Ordering::Relaxed),
117 }
118 }
119}
120
121#[derive(Clone, Copy, Debug, Default)]
123pub struct TickStatsSnapshot {
124 pub avg_tick_loop_duration_ns: u64,
126 pub avg_tick_loop_sleep_ns: u64,
128 pub max_drift_ns: i64,
130 pub total_ticks: u64,
132 pub total_drift_ns: i64,
134 pub missed_ticks: u64,
136 pub last_tick_ns: u64,
138}
139
140pub struct TickService {
142 default_tick_duration: Duration,
143 tick_duration_ns: AtomicU64,
144 handlers: Mutex<HashMap<u64, TickHandlerState>>,
145 shutdown: AtomicBool,
146 tick_thread: Mutex<Option<thread::JoinHandle<()>>>,
147 tick_stats: TickStats,
148 next_handler_id: AtomicU64,
149 shutdown_timeout_ns: AtomicU64,
150 error_count: AtomicU64,
151 log_handler_panics: AtomicBool,
152}
153
154impl TickService {
155 pub fn new(tick_duration: Duration) -> Arc<Self> {
158 debug_assert!(
160 tick_duration.as_nanos() <= u128::from(u64::MAX),
161 "Tick duration exceeds u64::MAX nanoseconds, will be truncated"
162 );
163
164 Arc::new(Self {
165 default_tick_duration: tick_duration,
166 tick_duration_ns: AtomicU64::new(
168 tick_duration.as_nanos().min(u128::from(u64::MAX)) as u64
169 ),
170 handlers: Mutex::new(HashMap::new()),
171 shutdown: AtomicBool::new(false),
172 tick_thread: Mutex::new(None),
173 tick_stats: TickStats::default(),
174 next_handler_id: AtomicU64::new(1),
175 shutdown_timeout_ns: AtomicU64::new(100_000_000), error_count: AtomicU64::new(0),
177 log_handler_panics: AtomicBool::new(false),
178 })
179 }
180
181 fn recalculate_tick_duration(&self, handlers: &mut HashMap<u64, TickHandlerState>) {
188 let default_duration_ns = self.default_tick_duration.as_nanos().min(u128::from(u64::MAX)) as u64;
189
190 let min_handler_duration_ns = handlers
191 .values()
192 .filter_map(|state| {
193 state.handler.upgrade().map(|h| {
194 let duration_ns = h.tick_duration().as_nanos();
195 debug_assert!(
197 duration_ns <= u128::from(u64::MAX),
198 "Handler tick duration exceeds u64::MAX nanoseconds, will be truncated"
199 );
200 duration_ns.min(u128::from(u64::MAX)) as u64
201 })
202 })
203 .min()
204 .unwrap_or(default_duration_ns);
205
206 self.tick_duration_ns.store(min_handler_duration_ns, Ordering::Relaxed);
209
210 for state in handlers.values_mut() {
212 if let Some(handler_arc) = state.handler.upgrade() {
213 let h_duration_ns = handler_arc.tick_duration().as_nanos();
214 debug_assert!(
215 h_duration_ns <= u128::from(u64::MAX),
216 "Handler tick duration exceeds u64::MAX nanoseconds, will be truncated"
217 );
218 let h_duration_ns = h_duration_ns.min(u128::from(u64::MAX)) as u64;
219 state.tick_interval = if h_duration_ns <= min_handler_duration_ns {
220 1
221 } else {
222 (h_duration_ns + min_handler_duration_ns - 1) / min_handler_duration_ns
223 };
224 }
225 }
226 }
227
228 pub fn register(
229 self: &Arc<Self>,
230 handler: Arc<dyn TickHandler>,
231 ) -> Option<TickHandlerRegistration> {
232 let mut handlers = self.handlers.lock().expect("handlers lock poisoned");
233
234 let handler_duration_ns = handler.tick_duration().as_nanos();
235 debug_assert!(
236 handler_duration_ns <= u128::from(u64::MAX),
237 "Handler tick duration exceeds u64::MAX nanoseconds, will be truncated"
238 );
239 let handler_duration_ns = handler_duration_ns.min(u128::from(u64::MAX)) as u64;
240
241 let current_base_ns = self.tick_duration_ns.load(Ordering::Acquire);
244 let tick_interval = if handler_duration_ns <= current_base_ns {
245 1 } else {
247 (handler_duration_ns + current_base_ns - 1) / current_base_ns };
249
250 let handler_id = self.next_handler_id.fetch_add(1, Ordering::Relaxed);
251
252 handlers.insert(handler_id, TickHandlerState {
253 handler: Arc::downgrade(&handler),
254 handler_id,
255 tick_interval,
256 handler_tick_count: 0,
257 stats: TickStats::default(),
258 });
259
260 self.recalculate_tick_duration(&mut handlers);
262
263 Some(TickHandlerRegistration::new(self, handler_id))
264 }
265
266 fn unregister(&self, handler_id: u64) -> bool {
269 let mut handlers = self.handlers.lock().expect("handlers lock poisoned");
270
271 let removed = handlers.remove(&handler_id).is_some();
272
273 if removed {
274 self.recalculate_tick_duration(&mut handlers);
276 }
277
278 removed
279 }
280
281 pub fn start(self: &Arc<Self>) {
283 let mut tick_thread_guard = self.tick_thread.lock().expect("tick_thread lock poisoned");
284 if tick_thread_guard.is_some() {
285 return; }
287
288 let service = Arc::clone(self);
289
290 let handle = thread::spawn(move || {
291 let start = Instant::now();
292 let mut tick_count: u64 = 0;
293
294 loop {
295 let loop_start = Instant::now();
296
297 let tick_duration_ns = service.tick_duration_ns.load(Ordering::Relaxed);
299 let tick_duration = Duration::from_nanos(tick_duration_ns);
300
301 let next_tick = tick_count.wrapping_add(1);
304 let target_time = if let Some(target_duration) = tick_duration.checked_mul(next_tick as u32) {
305 start.checked_add(target_duration).unwrap_or_else(|| {
306 Instant::now() + tick_duration
308 })
309 } else {
310 Instant::now() + tick_duration
312 };
313
314 if service.shutdown.load(Ordering::Acquire) {
315 let handlers = service.handlers.lock().expect("handlers lock poisoned");
317 let shutdown_start = Instant::now();
318
319 for (_handler_id, state) in handlers.iter() {
320 let shutdown_timeout = Duration::from_nanos(service.shutdown_timeout_ns.load(Ordering::Relaxed));
322 if shutdown_start.elapsed() > shutdown_timeout {
323 eprintln!("Warning: Shutdown timeout exceeded, forcing exit");
324 break;
325 }
326
327 if let Some(handler) = state.handler.upgrade() {
329 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
330 handler.on_shutdown();
331 }));
332 if service.log_handler_panics.load(Ordering::Relaxed) && result.is_err() {
333 eprintln!("Warning: Handler on_shutdown() panicked during shutdown");
334 service.error_count.fetch_add(1, Ordering::Relaxed);
335 }
336 }
337 }
338 break;
339 }
340
341 let now_ns = start.elapsed().as_nanos().min(u128::from(u64::MAX)) as u64;
343
344 let (handlers_to_call, dead_handler_ids): (Vec<_>, Vec<_>) = {
347 let mut handlers = service.handlers.lock().expect("handlers lock poisoned");
348 let estimated_handlers = handlers.len();
349 let mut calls = Vec::with_capacity(estimated_handlers);
350 let mut dead_ids = Vec::new(); for (handler_id, state) in handlers.iter_mut() {
353 let handler_alive = state.handler.upgrade();
356
357 if handler_alive.is_none() {
358 dead_ids.push(*handler_id);
359 continue;
360 }
361
362 state.stats.last_tick_ns.store(now_ns, Ordering::Relaxed);
364
365 if tick_count % state.tick_interval == 0 {
367 if let Some(handler) = handler_alive {
368 calls.push((
369 handler,
370 state.handler_tick_count,
371 *handler_id,
372 ));
373 state.handler_tick_count = state.handler_tick_count.wrapping_add(1);
374 }
375 } else {
376 state.stats.missed_ticks.fetch_add(1, Ordering::Relaxed);
378 }
379 }
380 (calls, dead_ids)
381 }; if !dead_handler_ids.is_empty() {
385 let mut handlers = service.handlers.lock().expect("handlers lock poisoned");
386 for dead_id in dead_handler_ids {
387 handlers.remove(&dead_id);
388 }
389
390 service.recalculate_tick_duration(&mut handlers);
392 }
393
394 let mut stats_updates = Vec::with_capacity(handlers_to_call.len());
397 for (handler, handler_tick_count, handler_id) in handlers_to_call {
398 let handler_start = Instant::now();
399
400 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
402 handler.on_tick(handler_tick_count, now_ns);
403 }));
404 if service.log_handler_panics.load(Ordering::Relaxed) && result.is_err() {
405 eprintln!("Warning: Handler on_tick() panicked for tick {}", tick_count);
406 service.error_count.fetch_add(1, Ordering::Relaxed);
407 }
408
409 let handler_duration_ns = handler_start.elapsed().as_nanos() as u64;
411
412 stats_updates.push((handler_id, handler_duration_ns));
414 }
415
416 if !stats_updates.is_empty() {
418 let mut handlers = service.handlers.lock().expect("handlers lock poisoned");
419 for (handler_id, handler_duration_ns) in stats_updates {
420 if let Some(state) = handlers.get_mut(&handler_id) {
421 let prev_total = state.stats.total_ticks.fetch_add(1, Ordering::Relaxed);
422 let new_total = prev_total + 1;
423
424 let new_avg_duration = if prev_total == 0 {
426 handler_duration_ns
427 } else {
428 let prev_avg = state
429 .stats
430 .avg_tick_loop_duration_ns
431 .load(Ordering::Relaxed) as i64;
432 let new_avg = prev_avg
433 + ((handler_duration_ns as i64 - prev_avg) / new_total as i64);
434 new_avg as u64
435 };
436 state
437 .stats
438 .avg_tick_loop_duration_ns
439 .store(new_avg_duration, Ordering::Relaxed);
440 }
441 }
442 }
443
444 tick_count = tick_count.wrapping_add(1);
445
446 let loop_end = Instant::now();
448 let loop_duration_ns = loop_end.duration_since(loop_start).as_nanos() as u64;
449
450 let now = Instant::now();
452 let actual_sleep_ns =
453 if let Some(sleep_duration) = target_time.checked_duration_since(now) {
454 thread::sleep(sleep_duration);
455 sleep_duration.as_nanos() as u64
456 } else {
457 thread::yield_now();
459 0
460 };
461
462 let after_sleep = Instant::now();
464 let drift_ns_i64 = if after_sleep >= target_time {
465 let drift = after_sleep.duration_since(target_time).as_nanos();
467 if drift > i64::MAX as u128 {
468 eprintln!("Warning: Extreme positive drift detected ({} ns), exceeding i64::MAX", drift);
469 i64::MAX
470 } else {
471 drift as i64
472 }
473 } else {
474 let drift = target_time.duration_since(after_sleep).as_nanos();
476 if drift > i64::MAX as u128 {
477 eprintln!("Warning: Extreme negative drift detected ({} ns), exceeding i64::MAX", drift);
478 i64::MIN
479 } else {
480 -(drift as i64)
481 }
482 };
483
484 let prev_total = service
486 .tick_stats
487 .total_ticks
488 .fetch_add(1, Ordering::Relaxed);
489 let new_total = prev_total + 1;
490
491 let new_avg_duration = if prev_total == 0 {
493 loop_duration_ns
494 } else {
495 let prev_avg = service
496 .tick_stats
497 .avg_tick_loop_duration_ns
498 .load(Ordering::Relaxed) as i64;
499 let new_avg =
500 prev_avg + ((loop_duration_ns as i64 - prev_avg) / new_total as i64);
501 new_avg as u64
502 };
503 service
504 .tick_stats
505 .avg_tick_loop_duration_ns
506 .store(new_avg_duration, Ordering::Relaxed);
507
508 let new_avg_sleep = if prev_total == 0 {
509 actual_sleep_ns
510 } else {
511 let prev_avg = service
512 .tick_stats
513 .avg_tick_loop_sleep_ns
514 .load(Ordering::Relaxed) as i64;
515 let new_avg =
516 prev_avg + ((actual_sleep_ns as i64 - prev_avg) / new_total as i64);
517 new_avg as u64
518 };
519 service
520 .tick_stats
521 .avg_tick_loop_sleep_ns
522 .store(new_avg_sleep, Ordering::Relaxed);
523
524 let drift_abs = drift_ns_i64.unsigned_abs();
526 let mut current_max = service.tick_stats.max_drift_ns.load(Ordering::Relaxed);
527 while drift_abs > current_max {
528 match service.tick_stats.max_drift_ns.compare_exchange_weak(
529 current_max,
530 drift_abs,
531 Ordering::Relaxed,
532 Ordering::Relaxed,
533 ) {
534 Ok(_) => break,
535 Err(actual) => current_max = actual,
536 }
537 }
538
539 service
541 .tick_stats
542 .total_drift_ns
543 .fetch_add(drift_ns_i64, Ordering::Relaxed);
544 }
545 });
546
547 *tick_thread_guard = Some(handle);
548 }
549
550 pub fn shutdown(&self) {
553 if self.shutdown.swap(true, Ordering::Release) {
555 return; }
557
558 if let Some(handle) = self.tick_thread.lock().expect("tick_thread lock poisoned").take() {
560 let _ = handle.join();
561 }
562 }
563
564 pub fn tick_stats(&self) -> TickStatsSnapshot {
566 self.tick_stats.snapshot()
567 }
568
569 pub fn current_tick_duration_ns(&self) -> u64 {
571 self.tick_duration_ns.load(Ordering::Acquire)
572 }
573
574 pub fn set_shutdown_timeout(&self, timeout: Duration) {
576 let timeout_ns = timeout.as_nanos().min(u128::from(u64::MAX)) as u64;
577 self.shutdown_timeout_ns.store(timeout_ns, Ordering::Relaxed);
578 }
579
580
581 pub fn error_stats(&self) -> (u64, TickStatsSnapshot) {
583 (self.error_count.load(Ordering::Relaxed), self.tick_stats.snapshot())
584 }
585
586 pub fn set_panic_logging(&self, enabled: bool) {
588 self.log_handler_panics.store(enabled, Ordering::Relaxed);
589 }
590
591 pub fn is_panic_logging_enabled(&self) -> bool {
593 self.log_handler_panics.load(Ordering::Relaxed)
594 }
595
596 pub fn handler_stats(&self) -> Vec<TickStatsSnapshot> {
598 let handlers = self.handlers.lock().expect("handlers lock poisoned");
599 handlers
600 .values()
601 .map(|state| state.stats.snapshot())
602 .collect()
603 }
604}
605
606impl Drop for TickService {
607 fn drop(&mut self) {
608 if !self.shutdown.load(Ordering::Acquire) {
609 self.shutdown();
610 }
611 }
612}