maniac_runtime/runtime/
ticker.rs

1use std::{
2    panic,
3    collections::HashMap,
4    sync::{
5        Arc, Mutex, Weak,
6        atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering},
7    },
8    thread,
9    time::{Duration, Instant},
10};
11
12// Note: Panic logging is now per-instance, configured via TickService::new_with_config()
13// or TickService::set_panic_logging()
14
15/// RAII guard for a registered tick handler.
16/// Automatically unregisters the handler when dropped.
17pub struct TickHandlerRegistration {
18    tick_service: Weak<TickService>,
19    handler_id: u64,
20}
21
22impl TickHandlerRegistration {
23    fn new(tick_service: &Arc<TickService>, handler_id: u64) -> Self {
24        Self {
25            tick_service: Arc::downgrade(tick_service),
26            handler_id,
27        }
28    }
29
30}
31
32impl Drop for TickHandlerRegistration {
33    fn drop(&mut self) {
34        if let Some(service) = self.tick_service.upgrade() {
35            service.unregister(self.handler_id);
36        }
37    }
38}
39
40// SAFETY: TickHandlerRegistration only contains thread-safe types
41unsafe impl Send for TickHandlerRegistration {}
42unsafe impl Sync for TickHandlerRegistration {}
43
44/// Trait for services that need to be notified on each tick.
45pub trait TickHandler: Send + Sync {
46    /// Returns the desired tick duration for this handler.
47    /// The TickService will use the minimum of all registered handlers' tick durations.
48    fn tick_duration(&self) -> Duration;
49
50    /// Called on each tick with the current time in nanoseconds and tick count.
51    fn on_tick(&self, tick_count: u64, now_ns: u64);
52
53    /// Called when the tick service is shutting down.
54    fn on_shutdown(&self);
55}
56
57/// State tracking for a registered tick handler.
58struct TickHandlerState {
59    /// Weak reference to the handler implementation.
60    /// This allows handlers to be dropped even if they're still registered.
61    handler: Weak<dyn TickHandler>,
62    /// Unique identifier for this handler registration.
63    handler_id: u64,
64    /// How many base ticks must elapse between calls to this handler.
65    /// E.g., if base tick is 1ms and handler wants 10ms, tick_interval = 10.
66    tick_interval: u64,
67    /// The handler's own monotonic tick count, incremented on each invocation.
68    handler_tick_count: u64,
69    /// Statistics for this handler's tick timing and performance.
70    stats: TickStats,
71}
72
73/// Statistics for the tick thread to monitor timing accuracy and performance
74#[derive(Debug)]
75pub struct TickStats {
76    /// Average duration of tick loop processing (excluding sleep) in nanoseconds
77    pub avg_tick_loop_duration_ns: AtomicU64,
78    /// Average sleep duration in nanoseconds
79    pub avg_tick_loop_sleep_ns: AtomicU64,
80    /// Maximum absolute drift observed in nanoseconds
81    pub max_drift_ns: AtomicU64,
82    /// Total number of ticks processed
83    pub total_ticks: AtomicU64,
84    /// Total cumulative drift in nanoseconds (positive = behind schedule)
85    pub total_drift_ns: AtomicI64,
86    /// Number of ticks where this handler wasn't called (due to interval)
87    pub missed_ticks: AtomicU64,
88    /// Timestamp of the last tick in nanoseconds
89    pub last_tick_ns: AtomicU64,
90}
91
92impl Default for TickStats {
93    fn default() -> Self {
94        Self {
95            avg_tick_loop_duration_ns: AtomicU64::new(0),
96            avg_tick_loop_sleep_ns: AtomicU64::new(0),
97            max_drift_ns: AtomicU64::new(0),
98            total_ticks: AtomicU64::new(0),
99            total_drift_ns: AtomicI64::new(0),
100            missed_ticks: AtomicU64::new(0),
101            last_tick_ns: AtomicU64::new(0),
102        }
103    }
104}
105
106impl TickStats {
107    /// Returns a snapshot of the current statistics
108    pub fn snapshot(&self) -> TickStatsSnapshot {
109        TickStatsSnapshot {
110            avg_tick_loop_duration_ns: self.avg_tick_loop_duration_ns.load(Ordering::Relaxed),
111            avg_tick_loop_sleep_ns: self.avg_tick_loop_sleep_ns.load(Ordering::Relaxed),
112            max_drift_ns: self.max_drift_ns.load(Ordering::Relaxed) as i64,
113            total_ticks: self.total_ticks.load(Ordering::Relaxed),
114            total_drift_ns: self.total_drift_ns.load(Ordering::Relaxed),
115            missed_ticks: self.missed_ticks.load(Ordering::Relaxed),
116            last_tick_ns: self.last_tick_ns.load(Ordering::Relaxed),
117        }
118    }
119}
120
121/// Snapshot of tick statistics at a point in time
122#[derive(Clone, Copy, Debug, Default)]
123pub struct TickStatsSnapshot {
124    /// Average duration of tick loop processing (excluding sleep) in nanoseconds
125    pub avg_tick_loop_duration_ns: u64,
126    /// Average sleep duration in nanoseconds
127    pub avg_tick_loop_sleep_ns: u64,
128    /// Maximum absolute drift observed in nanoseconds
129    pub max_drift_ns: i64,
130    /// Total number of ticks processed
131    pub total_ticks: u64,
132    /// Total cumulative drift in nanoseconds (positive = behind schedule)
133    pub total_drift_ns: i64,
134    /// Number of ticks where this handler wasn't called (due to interval)
135    pub missed_ticks: u64,
136    /// Timestamp of the last tick in nanoseconds
137    pub last_tick_ns: u64,
138}
139
140/// Shared tick service that can coordinate multiple WorkerServices.
141pub struct TickService {
142    default_tick_duration: Duration,
143    tick_duration_ns: AtomicU64,
144    handlers: Mutex<HashMap<u64, TickHandlerState>>,
145    shutdown: AtomicBool,
146    tick_thread: Mutex<Option<thread::JoinHandle<()>>>,
147    tick_stats: TickStats,
148    next_handler_id: AtomicU64,
149    shutdown_timeout_ns: AtomicU64,
150    error_count: AtomicU64,
151    log_handler_panics: AtomicBool,
152}
153
154impl TickService {
155    /// Create a new TickService with the specified default tick duration.
156    /// The actual tick duration used will be the minimum of the default and all registered handlers.
157    pub fn new(tick_duration: Duration) -> Arc<Self> {
158        // Debug assertion to catch duration truncation
159        debug_assert!(
160            tick_duration.as_nanos() <= u128::from(u64::MAX),
161            "Tick duration exceeds u64::MAX nanoseconds, will be truncated"
162        );
163
164        Arc::new(Self {
165            default_tick_duration: tick_duration,
166            // Use Relaxed ordering since this is only set during construction
167            tick_duration_ns: AtomicU64::new(
168                tick_duration.as_nanos().min(u128::from(u64::MAX)) as u64
169            ),
170            handlers: Mutex::new(HashMap::new()),
171            shutdown: AtomicBool::new(false),
172            tick_thread: Mutex::new(None),
173            tick_stats: TickStats::default(),
174            next_handler_id: AtomicU64::new(1),
175            shutdown_timeout_ns: AtomicU64::new(100_000_000), // Default to 100ms
176            error_count: AtomicU64::new(0),
177            log_handler_panics: AtomicBool::new(false),
178        })
179    }
180
181    /// Register a handler to be called on each tick.
182    /// Returns a registration guard that will automatically unregister the handler when dropped.
183    /// Recalculates the base tick duration and all handler intervals.
184    /// This is called when handlers are added or removed to maintain optimal timing.
185    ///
186    /// Note: Must be called while holding the handlers lock to prevent race conditions.
187    fn recalculate_tick_duration(&self, handlers: &mut HashMap<u64, TickHandlerState>) {
188        let default_duration_ns = self.default_tick_duration.as_nanos().min(u128::from(u64::MAX)) as u64;
189
190        let min_handler_duration_ns = handlers
191            .values()
192            .filter_map(|state| {
193                state.handler.upgrade().map(|h| {
194                    let duration_ns = h.tick_duration().as_nanos();
195                    // Debug assertion for duration truncation
196                    debug_assert!(
197                        duration_ns <= u128::from(u64::MAX),
198                        "Handler tick duration exceeds u64::MAX nanoseconds, will be truncated"
199                    );
200                    duration_ns.min(u128::from(u64::MAX)) as u64
201                })
202            })
203            .min()
204            .unwrap_or(default_duration_ns);
205
206        // Always store the new base duration and recalculate intervals
207        // We hold the handlers lock so this is safe from race conditions
208        self.tick_duration_ns.store(min_handler_duration_ns, Ordering::Relaxed);
209
210        // Recalculate intervals for all handlers with the new base duration
211        for state in handlers.values_mut() {
212            if let Some(handler_arc) = state.handler.upgrade() {
213                let h_duration_ns = handler_arc.tick_duration().as_nanos();
214                debug_assert!(
215                    h_duration_ns <= u128::from(u64::MAX),
216                    "Handler tick duration exceeds u64::MAX nanoseconds, will be truncated"
217                );
218                let h_duration_ns = h_duration_ns.min(u128::from(u64::MAX)) as u64;
219                state.tick_interval = if h_duration_ns <= min_handler_duration_ns {
220                    1
221                } else {
222                    (h_duration_ns + min_handler_duration_ns - 1) / min_handler_duration_ns
223                };
224            }
225        }
226    }
227
228    pub fn register(
229        self: &Arc<Self>,
230        handler: Arc<dyn TickHandler>,
231    ) -> Option<TickHandlerRegistration> {
232        let mut handlers = self.handlers.lock().expect("handlers lock poisoned");
233        
234        let handler_duration_ns = handler.tick_duration().as_nanos();
235        debug_assert!(
236            handler_duration_ns <= u128::from(u64::MAX),
237            "Handler tick duration exceeds u64::MAX nanoseconds, will be truncated"
238        );
239        let handler_duration_ns = handler_duration_ns.min(u128::from(u64::MAX)) as u64;
240
241        // Calculate how many base ticks between calls to this handler
242        // Use Acquire ordering to get the latest tick duration from other threads
243        let current_base_ns = self.tick_duration_ns.load(Ordering::Acquire);
244        let tick_interval = if handler_duration_ns <= current_base_ns {
245            1 // Call on every tick
246        } else {
247            (handler_duration_ns + current_base_ns - 1) / current_base_ns // Round up
248        };
249        
250        let handler_id = self.next_handler_id.fetch_add(1, Ordering::Relaxed);
251        
252        handlers.insert(handler_id, TickHandlerState {
253            handler: Arc::downgrade(&handler),
254            handler_id,
255            tick_interval,
256            handler_tick_count: 0,
257            stats: TickStats::default(),
258        });
259
260        // Update tick duration and recalculate intervals if needed
261        self.recalculate_tick_duration(&mut handlers);
262
263        Some(TickHandlerRegistration::new(self, handler_id))
264    }
265
266    /// Internal method to unregister a handler by ID.
267    /// Returns true if a handler was found and removed.
268    fn unregister(&self, handler_id: u64) -> bool {
269        let mut handlers = self.handlers.lock().expect("handlers lock poisoned");
270        
271        let removed = handlers.remove(&handler_id).is_some();
272        
273        if removed {
274            // Recalculate tick duration and intervals after removing a handler
275            self.recalculate_tick_duration(&mut handlers);
276        }
277
278        removed
279    }
280
281    /// Start the tick thread if not already started.
282    pub fn start(self: &Arc<Self>) {
283        let mut tick_thread_guard = self.tick_thread.lock().expect("tick_thread lock poisoned");
284        if tick_thread_guard.is_some() {
285            return; // Already started
286        }
287
288        let service = Arc::clone(self);
289
290        let handle = thread::spawn(move || {
291            let start = Instant::now();
292            let mut tick_count: u64 = 0;
293
294            loop {
295                let loop_start = Instant::now();
296
297                // Reload tick_duration from atomic on each iteration (allows dynamic adjustment)
298                let tick_duration_ns = service.tick_duration_ns.load(Ordering::Relaxed);
299                let tick_duration = Duration::from_nanos(tick_duration_ns);
300
301                // Calculate target time for this tick to maintain precise timing
302                // We calculate based on the next tick to know when to wake up
303                let next_tick = tick_count.wrapping_add(1);
304                let target_time = if let Some(target_duration) = tick_duration.checked_mul(next_tick as u32) {
305                    start.checked_add(target_duration).unwrap_or_else(|| {
306                        // If we overflow Instant, just use now + tick_duration as fallback
307                        Instant::now() + tick_duration
308                    })
309                } else {
310                    // tick_count is too large to multiply, use incremental approach
311                    Instant::now() + tick_duration
312                };
313
314                if service.shutdown.load(Ordering::Acquire) {
315                    // Graceful shutdown: notify all handlers with timeout
316                    let handlers = service.handlers.lock().expect("handlers lock poisoned");
317                    let shutdown_start = Instant::now();
318                    
319                    for (_handler_id, state) in handlers.iter() {
320                        // Check if we've exceeded the shutdown timeout
321                        let shutdown_timeout = Duration::from_nanos(service.shutdown_timeout_ns.load(Ordering::Relaxed));
322                        if shutdown_start.elapsed() > shutdown_timeout {
323                            eprintln!("Warning: Shutdown timeout exceeded, forcing exit");
324                            break;
325                        }
326                        
327                        // Call on_shutdown with panic isolation
328                        if let Some(handler) = state.handler.upgrade() {
329                            let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
330                                handler.on_shutdown();
331                            }));
332                            if service.log_handler_panics.load(Ordering::Relaxed) && result.is_err() {
333                                eprintln!("Warning: Handler on_shutdown() panicked during shutdown");
334                                service.error_count.fetch_add(1, Ordering::Relaxed);
335                            }
336                        }
337                    }
338                    break;
339                }
340
341                // Calculate current time
342                let now_ns = start.elapsed().as_nanos().min(u128::from(u64::MAX)) as u64;
343
344                // Collect handlers to call and release lock before calling them
345                // Pre-allocate vectors with estimated capacity to reduce allocations
346                let (handlers_to_call, dead_handler_ids): (Vec<_>, Vec<_>) = {
347                    let mut handlers = service.handlers.lock().expect("handlers lock poisoned");
348                    let estimated_handlers = handlers.len();
349                    let mut calls = Vec::with_capacity(estimated_handlers);
350                    let mut dead_ids = Vec::new(); // Usually empty, so no pre-allocation
351                    
352                    for (handler_id, state) in handlers.iter_mut() {
353                        // Check if handler is still alive - if not, mark for cleanup immediately
354                        // This ensures dead handlers are cleaned up promptly regardless of interval
355                        let handler_alive = state.handler.upgrade();
356
357                        if handler_alive.is_none() {
358                            dead_ids.push(*handler_id);
359                            continue;
360                        }
361
362                        // Always update the last tick timestamp
363                        state.stats.last_tick_ns.store(now_ns, Ordering::Relaxed);
364
365                        // Only call on_tick if tick_count is a multiple of this handler's interval
366                        if tick_count % state.tick_interval == 0 {
367                            if let Some(handler) = handler_alive {
368                                calls.push((
369                                    handler,
370                                    state.handler_tick_count,
371                                    *handler_id,
372                                ));
373                                state.handler_tick_count = state.handler_tick_count.wrapping_add(1);
374                            }
375                        } else {
376                            // Track missed tick for statistics
377                            state.stats.missed_ticks.fetch_add(1, Ordering::Relaxed);
378                        }
379                    }
380                    (calls, dead_ids)
381                }; // Lock released here
382
383                // Clean up dead handlers if any
384                if !dead_handler_ids.is_empty() {
385                    let mut handlers = service.handlers.lock().expect("handlers lock poisoned");
386                    for dead_id in dead_handler_ids {
387                        handlers.remove(&dead_id);
388                    }
389                    
390                    // Use the shared recalculation method to update tick duration and intervals
391                    service.recalculate_tick_duration(&mut handlers);
392                }
393
394                // Call handlers outside the lock to prevent deadlocks
395                // Process handlers and collect stats updates to minimize lock contention
396                let mut stats_updates = Vec::with_capacity(handlers_to_call.len());
397                for (handler, handler_tick_count, handler_id) in handlers_to_call {
398                    let handler_start = Instant::now();
399
400                    // Call on_tick with panic isolation
401                    let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
402                        handler.on_tick(handler_tick_count, now_ns);
403                    }));
404                    if service.log_handler_panics.load(Ordering::Relaxed) && result.is_err() {
405                        eprintln!("Warning: Handler on_tick() panicked for tick {}", tick_count);
406                        service.error_count.fetch_add(1, Ordering::Relaxed);
407                    }
408
409                    // Measure handler execution duration
410                    let handler_duration_ns = handler_start.elapsed().as_nanos() as u64;
411
412                    // Store stats update for batch processing
413                    stats_updates.push((handler_id, handler_duration_ns));
414                }
415
416                // Batch update handler statistics to minimize lock contention
417                if !stats_updates.is_empty() {
418                    let mut handlers = service.handlers.lock().expect("handlers lock poisoned");
419                    for (handler_id, handler_duration_ns) in stats_updates {
420                        if let Some(state) = handlers.get_mut(&handler_id) {
421                            let prev_total = state.stats.total_ticks.fetch_add(1, Ordering::Relaxed);
422                            let new_total = prev_total + 1;
423
424                            // Update running average for handler duration
425                            let new_avg_duration = if prev_total == 0 {
426                                handler_duration_ns
427                            } else {
428                                let prev_avg = state
429                                    .stats
430                                    .avg_tick_loop_duration_ns
431                                    .load(Ordering::Relaxed) as i64;
432                                let new_avg = prev_avg
433                                    + ((handler_duration_ns as i64 - prev_avg) / new_total as i64);
434                                new_avg as u64
435                            };
436                            state
437                                .stats
438                                .avg_tick_loop_duration_ns
439                                .store(new_avg_duration, Ordering::Relaxed);
440                        }
441                    }
442                }
443
444                tick_count = tick_count.wrapping_add(1);
445
446                // Measure tick loop processing duration
447                let loop_end = Instant::now();
448                let loop_duration_ns = loop_end.duration_since(loop_start).as_nanos() as u64;
449
450                // Sleep until target time, accounting for processing time
451                let now = Instant::now();
452                let actual_sleep_ns =
453                    if let Some(sleep_duration) = target_time.checked_duration_since(now) {
454                        thread::sleep(sleep_duration);
455                        sleep_duration.as_nanos() as u64
456                    } else {
457                        // We're behind schedule - yield but don't sleep to catch up
458                        thread::yield_now();
459                        0
460                    };
461
462                // Calculate drift (positive = behind schedule, negative = ahead of schedule)
463                let after_sleep = Instant::now();
464                let drift_ns_i64 = if after_sleep >= target_time {
465                    // Behind schedule (or exactly on time)
466                    let drift = after_sleep.duration_since(target_time).as_nanos();
467                    if drift > i64::MAX as u128 {
468                        eprintln!("Warning: Extreme positive drift detected ({} ns), exceeding i64::MAX", drift);
469                        i64::MAX
470                    } else {
471                        drift as i64
472                    }
473                } else {
474                    // Ahead of schedule (woke up early) - negative drift
475                    let drift = target_time.duration_since(after_sleep).as_nanos();
476                    if drift > i64::MAX as u128 {
477                        eprintln!("Warning: Extreme negative drift detected ({} ns), exceeding i64::MAX", drift);
478                        i64::MIN
479                    } else {
480                        -(drift as i64)
481                    }
482                };
483
484                // Update tick stats atomically
485                let prev_total = service
486                    .tick_stats
487                    .total_ticks
488                    .fetch_add(1, Ordering::Relaxed);
489                let new_total = prev_total + 1;
490
491                // Update running averages using incremental formula
492                let new_avg_duration = if prev_total == 0 {
493                    loop_duration_ns
494                } else {
495                    let prev_avg = service
496                        .tick_stats
497                        .avg_tick_loop_duration_ns
498                        .load(Ordering::Relaxed) as i64;
499                    let new_avg =
500                        prev_avg + ((loop_duration_ns as i64 - prev_avg) / new_total as i64);
501                    new_avg as u64
502                };
503                service
504                    .tick_stats
505                    .avg_tick_loop_duration_ns
506                    .store(new_avg_duration, Ordering::Relaxed);
507
508                let new_avg_sleep = if prev_total == 0 {
509                    actual_sleep_ns
510                } else {
511                    let prev_avg = service
512                        .tick_stats
513                        .avg_tick_loop_sleep_ns
514                        .load(Ordering::Relaxed) as i64;
515                    let new_avg =
516                        prev_avg + ((actual_sleep_ns as i64 - prev_avg) / new_total as i64);
517                    new_avg as u64
518                };
519                service
520                    .tick_stats
521                    .avg_tick_loop_sleep_ns
522                    .store(new_avg_sleep, Ordering::Relaxed);
523
524                // Track max drift using compare-exchange loop (store absolute value)
525                let drift_abs = drift_ns_i64.unsigned_abs();
526                let mut current_max = service.tick_stats.max_drift_ns.load(Ordering::Relaxed);
527                while drift_abs > current_max {
528                    match service.tick_stats.max_drift_ns.compare_exchange_weak(
529                        current_max,
530                        drift_abs,
531                        Ordering::Relaxed,
532                        Ordering::Relaxed,
533                    ) {
534                        Ok(_) => break,
535                        Err(actual) => current_max = actual,
536                    }
537                }
538
539                // Accumulate total drift
540                service
541                    .tick_stats
542                    .total_drift_ns
543                    .fetch_add(drift_ns_i64, Ordering::Relaxed);
544            }
545        });
546
547        *tick_thread_guard = Some(handle);
548    }
549
550    /// Shutdown the tick service and wait for the thread to exit.
551    /// This is idempotent - multiple calls are safe.
552    pub fn shutdown(&self) {
553        // swap returns the OLD value - if it was already true, we've already shut down
554        if self.shutdown.swap(true, Ordering::Release) {
555            return; // Already shutting down or shut down
556        }
557
558        // Join tick thread
559        if let Some(handle) = self.tick_thread.lock().expect("tick_thread lock poisoned").take() {
560            let _ = handle.join();
561        }
562    }
563
564    /// Returns a snapshot of the current tick thread statistics
565    pub fn tick_stats(&self) -> TickStatsSnapshot {
566        self.tick_stats.snapshot()
567    }
568
569    /// Returns the current tick duration in nanoseconds
570    pub fn current_tick_duration_ns(&self) -> u64 {
571        self.tick_duration_ns.load(Ordering::Acquire)
572    }
573
574    /// Set the shutdown timeout for handlers
575    pub fn set_shutdown_timeout(&self, timeout: Duration) {
576        let timeout_ns = timeout.as_nanos().min(u128::from(u64::MAX)) as u64;
577        self.shutdown_timeout_ns.store(timeout_ns, Ordering::Relaxed);
578    }
579
580
581    /// Get error statistics for the tick service
582    pub fn error_stats(&self) -> (u64, TickStatsSnapshot) {
583        (self.error_count.load(Ordering::Relaxed), self.tick_stats.snapshot())
584    }
585
586    /// Enable or disable panic logging for handlers
587    pub fn set_panic_logging(&self, enabled: bool) {
588        self.log_handler_panics.store(enabled, Ordering::Relaxed);
589    }
590
591    /// Check if panic logging is enabled
592    pub fn is_panic_logging_enabled(&self) -> bool {
593        self.log_handler_panics.load(Ordering::Relaxed)
594    }
595
596    /// Returns a vector of tick statistics for each registered handler
597    pub fn handler_stats(&self) -> Vec<TickStatsSnapshot> {
598        let handlers = self.handlers.lock().expect("handlers lock poisoned");
599        handlers
600            .values()
601            .map(|state| state.stats.snapshot())
602            .collect()
603    }
604}
605
606impl Drop for TickService {
607    fn drop(&mut self) {
608        if !self.shutdown.load(Ordering::Acquire) {
609            self.shutdown();
610        }
611    }
612}