1#![forbid(unsafe_code)]
2
3use std::sync::atomic::{AtomicU64, Ordering};
17use web_time::Instant;
18
19static EFFECTS_COMMAND_TOTAL: AtomicU64 = AtomicU64::new(0);
24static EFFECTS_SUBSCRIPTION_TOTAL: AtomicU64 = AtomicU64::new(0);
25static EFFECTS_QUEUE_ENQUEUED: AtomicU64 = AtomicU64::new(0);
26static EFFECTS_QUEUE_PROCESSED: AtomicU64 = AtomicU64::new(0);
27static EFFECTS_QUEUE_DROPPED: AtomicU64 = AtomicU64::new(0);
28static EFFECTS_QUEUE_HIGH_WATER: AtomicU64 = AtomicU64::new(0);
29
30#[must_use]
32pub fn effects_command_total() -> u64 {
33 EFFECTS_COMMAND_TOTAL.load(Ordering::Relaxed)
34}
35
36#[must_use]
38pub fn effects_subscription_total() -> u64 {
39 EFFECTS_SUBSCRIPTION_TOTAL.load(Ordering::Relaxed)
40}
41
42#[must_use]
44pub fn effects_executed_total() -> u64 {
45 effects_command_total() + effects_subscription_total()
46}
47
48#[must_use]
54pub fn effects_queue_enqueued() -> u64 {
55 EFFECTS_QUEUE_ENQUEUED.load(Ordering::Relaxed)
56}
57
58#[must_use]
60pub fn effects_queue_processed() -> u64 {
61 EFFECTS_QUEUE_PROCESSED.load(Ordering::Relaxed)
62}
63
64#[must_use]
66pub fn effects_queue_dropped() -> u64 {
67 EFFECTS_QUEUE_DROPPED.load(Ordering::Relaxed)
68}
69
70#[must_use]
72pub fn effects_queue_high_water() -> u64 {
73 EFFECTS_QUEUE_HIGH_WATER.load(Ordering::Relaxed)
74}
75
76pub fn record_queue_enqueue(current_depth: u64) {
78 EFFECTS_QUEUE_ENQUEUED.fetch_add(1, Ordering::Relaxed);
79 let mut prev = EFFECTS_QUEUE_HIGH_WATER.load(Ordering::Relaxed);
81 while current_depth > prev {
82 match EFFECTS_QUEUE_HIGH_WATER.compare_exchange_weak(
83 prev,
84 current_depth,
85 Ordering::Relaxed,
86 Ordering::Relaxed,
87 ) {
88 Ok(_) => break,
89 Err(actual) => prev = actual,
90 }
91 }
92}
93
94pub fn record_queue_processed() {
96 EFFECTS_QUEUE_PROCESSED.fetch_add(1, Ordering::Relaxed);
97}
98
99pub fn record_queue_drop(reason: &str) {
101 EFFECTS_QUEUE_DROPPED.fetch_add(1, Ordering::Relaxed);
102 tracing::warn!(
103 target: "ftui.effect",
104 reason = reason,
105 monotonic.counter.effects_queue_dropped_total = 1_u64,
106 "effect queue task dropped"
107 );
108}
109
110#[derive(Debug, Clone, Copy, PartialEq, Eq)]
112pub struct QueueTelemetry {
113 pub enqueued: u64,
115 pub processed: u64,
117 pub dropped: u64,
119 pub high_water: u64,
121 pub in_flight: u64,
123}
124
125#[must_use]
138pub fn queue_telemetry() -> QueueTelemetry {
139 let enqueued = effects_queue_enqueued();
140 let processed = effects_queue_processed();
141 let dropped = effects_queue_dropped();
142 let in_flight = enqueued.saturating_sub(processed).saturating_sub(dropped);
143 QueueTelemetry {
144 enqueued,
145 processed,
146 dropped,
147 high_water: effects_queue_high_water(),
148 in_flight,
149 }
150}
151
152static SUBSCRIPTION_STARTS_TOTAL: AtomicU64 = AtomicU64::new(0);
160static SUBSCRIPTION_STOPS_TOTAL: AtomicU64 = AtomicU64::new(0);
161static SUBSCRIPTION_PANICS_TOTAL: AtomicU64 = AtomicU64::new(0);
162static RECONCILE_COUNT: AtomicU64 = AtomicU64::new(0);
163static RECONCILE_DURATION_US_TOTAL: AtomicU64 = AtomicU64::new(0);
164static SHUTDOWN_DURATION_US_LAST: AtomicU64 = AtomicU64::new(0);
165static SHUTDOWN_TIMED_OUT_TOTAL: AtomicU64 = AtomicU64::new(0);
166
167#[must_use]
169pub fn subscription_starts_total() -> u64 {
170 SUBSCRIPTION_STARTS_TOTAL.load(Ordering::Relaxed)
171}
172
173#[must_use]
175pub fn subscription_stops_total() -> u64 {
176 SUBSCRIPTION_STOPS_TOTAL.load(Ordering::Relaxed)
177}
178
179#[must_use]
181pub fn subscription_panics_total() -> u64 {
182 SUBSCRIPTION_PANICS_TOTAL.load(Ordering::Relaxed)
183}
184
185#[must_use]
187pub fn reconcile_count() -> u64 {
188 RECONCILE_COUNT.load(Ordering::Relaxed)
189}
190
191#[must_use]
193pub fn reconcile_duration_us_total() -> u64 {
194 RECONCILE_DURATION_US_TOTAL.load(Ordering::Relaxed)
195}
196
197#[must_use]
199pub fn shutdown_duration_us_last() -> u64 {
200 SHUTDOWN_DURATION_US_LAST.load(Ordering::Relaxed)
201}
202
203#[must_use]
205pub fn shutdown_timed_out_total() -> u64 {
206 SHUTDOWN_TIMED_OUT_TOTAL.load(Ordering::Relaxed)
207}
208
209pub fn record_dynamics_sub_start() {
211 SUBSCRIPTION_STARTS_TOTAL.fetch_add(1, Ordering::Relaxed);
212}
213
214pub fn record_dynamics_sub_stop() {
216 SUBSCRIPTION_STOPS_TOTAL.fetch_add(1, Ordering::Relaxed);
217}
218
219pub fn record_dynamics_sub_panic() {
221 SUBSCRIPTION_PANICS_TOTAL.fetch_add(1, Ordering::Relaxed);
222}
223
224pub fn record_dynamics_reconcile(duration_us: u64) {
226 RECONCILE_COUNT.fetch_add(1, Ordering::Relaxed);
227 RECONCILE_DURATION_US_TOTAL.fetch_add(duration_us, Ordering::Relaxed);
228}
229
230pub fn record_dynamics_shutdown(duration_us: u64, timed_out: u64) {
232 SHUTDOWN_DURATION_US_LAST.store(duration_us, Ordering::Relaxed);
233 SHUTDOWN_TIMED_OUT_TOTAL.fetch_add(timed_out, Ordering::Relaxed);
234}
235
236#[derive(Debug, Clone, Copy, PartialEq, Eq)]
238pub struct RuntimeDynamics {
239 pub sub_starts: u64,
241 pub sub_stops: u64,
243 pub sub_panics: u64,
245 pub sub_active_estimate: u64,
247 pub reconciles: u64,
249 pub reconcile_avg_us: u64,
251 pub shutdown_last_us: u64,
253 pub shutdown_timeouts: u64,
255}
256
257#[must_use]
259pub fn runtime_dynamics() -> RuntimeDynamics {
260 let sub_starts = subscription_starts_total();
261 let sub_stops = subscription_stops_total();
262 let reconciles = reconcile_count();
263 let reconcile_total_us = reconcile_duration_us_total();
264 RuntimeDynamics {
265 sub_starts,
266 sub_stops,
267 sub_panics: subscription_panics_total(),
268 sub_active_estimate: sub_starts.saturating_sub(sub_stops),
269 reconciles,
270 reconcile_avg_us: reconcile_total_us.checked_div(reconciles).unwrap_or(0),
271 shutdown_last_us: shutdown_duration_us_last(),
272 shutdown_timeouts: shutdown_timed_out_total(),
273 }
274}
275
276pub fn trace_command_effect<F, R>(command_type: &str, f: F) -> R
285where
286 F: FnOnce() -> R,
287{
288 EFFECTS_COMMAND_TOTAL.fetch_add(1, Ordering::Relaxed);
289
290 let start = Instant::now();
291 let _span = tracing::debug_span!(
292 "effect.command",
293 command_type = %command_type,
294 duration_us = tracing::field::Empty,
295 result = tracing::field::Empty,
296 )
297 .entered();
298
299 tracing::debug!(
300 target: "ftui.effect",
301 command_type = %command_type,
302 "command effect started"
303 );
304
305 let result = f();
306 let duration_us = start.elapsed().as_micros() as u64;
307
308 tracing::debug!(
309 target: "ftui.effect",
310 command_type = %command_type,
311 duration_us = duration_us,
312 effect_duration_us = duration_us,
313 "command effect completed"
314 );
315
316 result
317}
318
319pub fn record_command_effect(command_type: &str, duration_us: u64) {
321 EFFECTS_COMMAND_TOTAL.fetch_add(1, Ordering::Relaxed);
322
323 let _span = tracing::debug_span!(
324 "effect.command",
325 command_type = %command_type,
326 duration_us = duration_us,
327 result = "ok",
328 )
329 .entered();
330
331 tracing::debug!(
332 target: "ftui.effect",
333 command_type = %command_type,
334 duration_us = duration_us,
335 effect_duration_us = duration_us,
336 "command effect recorded"
337 );
338}
339
340pub fn record_subscription_start(sub_type: &str, sub_id: u64) {
346 EFFECTS_SUBSCRIPTION_TOTAL.fetch_add(1, Ordering::Relaxed);
347
348 let _span = tracing::debug_span!(
349 "effect.subscription",
350 sub_type = %sub_type,
351 event_count = 0u64,
352 active = true,
353 )
354 .entered();
355
356 tracing::debug!(
357 target: "ftui.effect",
358 sub_type = %sub_type,
359 sub_id = sub_id,
360 active = true,
361 "subscription started"
362 );
363}
364
365pub fn record_subscription_stop(sub_type: &str, sub_id: u64, event_count: u64) {
367 let _span = tracing::debug_span!(
368 "effect.subscription",
369 sub_type = %sub_type,
370 event_count = event_count,
371 active = false,
372 )
373 .entered();
374
375 tracing::debug!(
376 target: "ftui.effect",
377 sub_type = %sub_type,
378 sub_id = sub_id,
379 event_count = event_count,
380 active = false,
381 "subscription stopped"
382 );
383}
384
385pub fn warn_effect_timeout(effect_type: &str, deadline_us: u64) {
387 tracing::warn!(
388 target: "ftui.effect",
389 effect_type = %effect_type,
390 deadline_us = deadline_us,
391 "effect timeout exceeded deadline"
392 );
393}
394
395pub fn error_effect_panic(effect_type: &str, panic_msg: &str) {
397 tracing::error!(
398 target: "ftui.effect",
399 effect_type = %effect_type,
400 panic_msg = %panic_msg,
401 "effect panicked during execution"
402 );
403}
404
405#[cfg(test)]
410mod tests {
411 use super::*;
412 use std::collections::HashMap;
413 use std::sync::{Arc, Mutex};
414 use tracing_subscriber::layer::SubscriberExt;
415 use tracing_subscriber::registry::LookupSpan;
416
417 #[derive(Debug, Clone)]
419 #[allow(dead_code)]
420 struct CapturedSpan {
421 name: String,
422 fields: HashMap<String, String>,
423 }
424
425 #[derive(Debug, Clone)]
426 #[allow(dead_code)]
427 struct CapturedEvent {
428 level: tracing::Level,
429 target: String,
430 fields: HashMap<String, String>,
431 }
432
433 struct SpanCapture {
434 spans: Arc<Mutex<Vec<CapturedSpan>>>,
435 events: Arc<Mutex<Vec<CapturedEvent>>>,
436 }
437
438 impl SpanCapture {
439 fn new() -> (Self, CaptureHandle) {
440 let spans = Arc::new(Mutex::new(Vec::new()));
441 let events = Arc::new(Mutex::new(Vec::new()));
442 let handle = CaptureHandle {
443 spans: spans.clone(),
444 events: events.clone(),
445 };
446 (Self { spans, events }, handle)
447 }
448 }
449
450 struct CaptureHandle {
451 spans: Arc<Mutex<Vec<CapturedSpan>>>,
452 events: Arc<Mutex<Vec<CapturedEvent>>>,
453 }
454
455 impl CaptureHandle {
456 fn spans(&self) -> Vec<CapturedSpan> {
457 self.spans.lock().unwrap().clone()
458 }
459
460 fn events(&self) -> Vec<CapturedEvent> {
461 self.events.lock().unwrap().clone()
462 }
463 }
464
465 struct FieldVisitor(Vec<(String, String)>);
466
467 impl tracing::field::Visit for FieldVisitor {
468 fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
469 self.0
470 .push((field.name().to_string(), format!("{value:?}")));
471 }
472 fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
473 self.0.push((field.name().to_string(), value.to_string()));
474 }
475 fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
476 self.0.push((field.name().to_string(), value.to_string()));
477 }
478 fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
479 self.0.push((field.name().to_string(), value.to_string()));
480 }
481 fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
482 self.0.push((field.name().to_string(), value.to_string()));
483 }
484 }
485
486 impl<S> tracing_subscriber::Layer<S> for SpanCapture
487 where
488 S: tracing::Subscriber + for<'a> LookupSpan<'a>,
489 {
490 fn on_new_span(
491 &self,
492 attrs: &tracing::span::Attributes<'_>,
493 _id: &tracing::span::Id,
494 _ctx: tracing_subscriber::layer::Context<'_, S>,
495 ) {
496 let mut visitor = FieldVisitor(Vec::new());
497 attrs.record(&mut visitor);
498 let mut fields: HashMap<String, String> = visitor.0.into_iter().collect();
499 for field in attrs.metadata().fields() {
500 fields.entry(field.name().to_string()).or_default();
501 }
502 self.spans.lock().unwrap().push(CapturedSpan {
503 name: attrs.metadata().name().to_string(),
504 fields,
505 });
506 }
507
508 fn on_event(
509 &self,
510 event: &tracing::Event<'_>,
511 _ctx: tracing_subscriber::layer::Context<'_, S>,
512 ) {
513 let mut visitor = FieldVisitor(Vec::new());
514 event.record(&mut visitor);
515 let fields: HashMap<String, String> = visitor.0.into_iter().collect();
516 self.events.lock().unwrap().push(CapturedEvent {
517 level: *event.metadata().level(),
518 target: event.metadata().target().to_string(),
519 fields,
520 });
521 }
522 }
523
524 fn with_captured_tracing<F>(f: F) -> CaptureHandle
525 where
526 F: FnOnce(),
527 {
528 let (layer, handle) = SpanCapture::new();
529 let subscriber = tracing_subscriber::registry().with(layer);
530 tracing::subscriber::with_default(subscriber, f);
531 handle
532 }
533
534 #[test]
539 fn trace_command_effect_emits_span() {
540 let handle = with_captured_tracing(|| {
541 trace_command_effect("task", || 42);
542 });
543
544 let spans = handle.spans();
545 let cmd_spans: Vec<_> = spans
546 .iter()
547 .filter(|s| s.name == "effect.command")
548 .collect();
549 assert!(!cmd_spans.is_empty(), "expected effect.command span");
550 assert!(cmd_spans[0].fields.contains_key("command_type"));
551 }
552
553 #[test]
554 fn trace_command_effect_returns_value() {
555 let result = trace_command_effect("test", || 42);
556 assert_eq!(result, 42);
557 }
558
559 #[test]
560 fn trace_command_effect_debug_events() {
561 let handle = with_captured_tracing(|| {
562 trace_command_effect("file_io", || {});
563 });
564
565 let events = handle.events();
566 let start_events: Vec<_> = events
567 .iter()
568 .filter(|e| {
569 e.target == "ftui.effect"
570 && e.fields
571 .get("message")
572 .is_some_and(|m| m.contains("started"))
573 })
574 .collect();
575 assert!(!start_events.is_empty(), "expected start event");
576
577 let complete_events: Vec<_> = events
578 .iter()
579 .filter(|e| {
580 e.target == "ftui.effect"
581 && e.fields
582 .get("message")
583 .is_some_and(|m| m.contains("completed"))
584 })
585 .collect();
586 assert!(!complete_events.is_empty(), "expected complete event");
587
588 let evt = &complete_events[0];
589 assert!(
590 evt.fields.contains_key("duration_us"),
591 "missing duration_us"
592 );
593 assert!(
594 evt.fields.contains_key("effect_duration_us"),
595 "missing effect_duration_us histogram"
596 );
597 }
598
599 #[test]
600 fn record_command_effect_emits_span() {
601 let handle = with_captured_tracing(|| {
602 record_command_effect("clipboard", 150);
603 });
604
605 let spans = handle.spans();
606 let cmd_spans: Vec<_> = spans
607 .iter()
608 .filter(|s| s.name == "effect.command")
609 .collect();
610 assert!(!cmd_spans.is_empty());
611 assert_eq!(
612 cmd_spans[0].fields.get("command_type").unwrap(),
613 "clipboard"
614 );
615 }
616
617 #[test]
622 fn record_subscription_start_emits_span() {
623 let handle = with_captured_tracing(|| {
624 record_subscription_start("timer", 42);
625 });
626
627 let spans = handle.spans();
628 let sub_spans: Vec<_> = spans
629 .iter()
630 .filter(|s| s.name == "effect.subscription")
631 .collect();
632 assert!(!sub_spans.is_empty(), "expected effect.subscription span");
633 assert!(sub_spans[0].fields.contains_key("sub_type"));
634 assert!(sub_spans[0].fields.contains_key("active"));
635 }
636
637 #[test]
638 fn record_subscription_stop_emits_span() {
639 let handle = with_captured_tracing(|| {
640 record_subscription_stop("keyboard", 7, 100);
641 });
642
643 let spans = handle.spans();
644 let sub_spans: Vec<_> = spans
645 .iter()
646 .filter(|s| s.name == "effect.subscription")
647 .collect();
648 assert!(!sub_spans.is_empty());
649 assert!(sub_spans[0].fields.contains_key("event_count"));
650 }
651
652 #[test]
657 fn warn_effect_timeout_emits_warn_event() {
658 let handle = with_captured_tracing(|| {
659 warn_effect_timeout("task", 500_000);
660 });
661
662 let events = handle.events();
663 let warn_events: Vec<_> = events
664 .iter()
665 .filter(|e| e.level == tracing::Level::WARN && e.target == "ftui.effect")
666 .collect();
667 assert!(!warn_events.is_empty(), "expected WARN event for timeout");
668 }
669
670 #[test]
671 fn error_effect_panic_emits_error_event() {
672 let handle = with_captured_tracing(|| {
673 error_effect_panic("subscription", "thread panicked");
674 });
675
676 let events = handle.events();
677 let error_events: Vec<_> = events
678 .iter()
679 .filter(|e| e.level == tracing::Level::ERROR && e.target == "ftui.effect")
680 .collect();
681 assert!(!error_events.is_empty(), "expected ERROR event for panic");
682 }
683
684 #[test]
689 fn counter_accessors_callable() {
690 let cmd = effects_command_total();
691 let sub = effects_subscription_total();
692 let total = effects_executed_total();
693 assert_eq!(total, cmd + sub);
694 }
695
696 #[test]
697 fn counters_increment_on_command() {
698 let before = effects_command_total();
699 trace_command_effect("test", || {});
700 let after = effects_command_total();
701 assert!(
702 after > before,
703 "command counter should increment: {before} → {after}"
704 );
705 }
706
707 #[test]
708 fn counters_increment_on_subscription() {
709 let before = effects_subscription_total();
710 record_subscription_start("test", 1);
711 let after = effects_subscription_total();
712 assert!(
713 after > before,
714 "subscription counter should increment: {before} → {after}"
715 );
716 }
717
718 #[test]
723 fn queue_enqueue_increments_counter() {
724 let before = effects_queue_enqueued();
725 record_queue_enqueue(1);
726 let after = effects_queue_enqueued();
727 assert!(after > before, "enqueued counter should increment");
728 }
729
730 #[test]
731 fn queue_processed_increments_counter() {
732 let before = effects_queue_processed();
733 record_queue_processed();
734 let after = effects_queue_processed();
735 assert!(after > before, "processed counter should increment");
736 }
737
738 #[test]
739 fn queue_drop_increments_counter() {
740 let before = effects_queue_dropped();
741 record_queue_drop("test");
742 let after = effects_queue_dropped();
743 assert!(after > before, "dropped counter should increment");
744 }
745
746 #[test]
747 fn queue_high_water_ratchets_upward() {
748 let before = effects_queue_high_water();
749 let new_mark = before + 100;
750 record_queue_enqueue(new_mark);
751 assert!(
752 effects_queue_high_water() >= new_mark,
753 "high-water should ratchet to at least {new_mark}"
754 );
755 record_queue_enqueue(1);
757 assert!(
758 effects_queue_high_water() >= new_mark,
759 "high-water should not decrease"
760 );
761 }
762
763 #[test]
764 fn queue_telemetry_snapshot_consistent() {
765 let snap = queue_telemetry();
766 assert_eq!(
768 snap.in_flight,
769 snap.enqueued
770 .saturating_sub(snap.processed)
771 .saturating_sub(snap.dropped),
772 "in_flight should be enqueued - processed - dropped"
773 );
774 }
775
776 #[test]
781 fn dynamics_sub_start_increments() {
782 let before = subscription_starts_total();
783 record_dynamics_sub_start();
784 let after = subscription_starts_total();
785 assert!(after > before);
786 }
787
788 #[test]
789 fn dynamics_sub_stop_increments() {
790 let before = subscription_stops_total();
791 record_dynamics_sub_stop();
792 let after = subscription_stops_total();
793 assert!(after > before);
794 }
795
796 #[test]
797 fn dynamics_sub_panic_increments() {
798 let before = subscription_panics_total();
799 record_dynamics_sub_panic();
800 let after = subscription_panics_total();
801 assert!(after > before);
802 }
803
804 #[test]
805 fn dynamics_reconcile_records_count_and_duration() {
806 let before_count = reconcile_count();
807 let before_dur = reconcile_duration_us_total();
808 record_dynamics_reconcile(500);
809 assert!(reconcile_count() > before_count);
810 assert!(reconcile_duration_us_total() >= before_dur + 500);
811 }
812
813 #[test]
814 fn dynamics_shutdown_records_duration() {
815 record_dynamics_shutdown(1234, 2);
816 assert_eq!(shutdown_duration_us_last(), 1234);
817 let timeouts = shutdown_timed_out_total();
818 assert!(timeouts >= 2);
819 }
820
821 #[test]
822 fn dynamics_snapshot_consistent() {
823 let snap = runtime_dynamics();
824 assert_eq!(
825 snap.sub_active_estimate,
826 snap.sub_starts.saturating_sub(snap.sub_stops),
827 "active estimate = starts - stops"
828 );
829 if snap.reconciles > 0 {
830 assert!(
831 snap.reconcile_avg_us > 0 || reconcile_duration_us_total() == 0,
832 "avg should be > 0 when reconciles happened with non-zero duration"
833 );
834 }
835 }
836}