1#![forbid(unsafe_code)]
2
3use std::sync::atomic::{AtomicU64, Ordering};
17use web_time::Instant;
18
19static EFFECTS_COMMAND_TOTAL: AtomicU64 = AtomicU64::new(0);
24static EFFECTS_SUBSCRIPTION_TOTAL: AtomicU64 = AtomicU64::new(0);
25static EFFECTS_QUEUE_ENQUEUED: AtomicU64 = AtomicU64::new(0);
26static EFFECTS_QUEUE_PROCESSED: AtomicU64 = AtomicU64::new(0);
27static EFFECTS_QUEUE_DROPPED: AtomicU64 = AtomicU64::new(0);
28static EFFECTS_QUEUE_HIGH_WATER: AtomicU64 = AtomicU64::new(0);
29
30#[must_use]
32pub fn effects_command_total() -> u64 {
33 EFFECTS_COMMAND_TOTAL.load(Ordering::Relaxed)
34}
35
36#[must_use]
38pub fn effects_subscription_total() -> u64 {
39 EFFECTS_SUBSCRIPTION_TOTAL.load(Ordering::Relaxed)
40}
41
42#[must_use]
44pub fn effects_executed_total() -> u64 {
45 effects_command_total() + effects_subscription_total()
46}
47
48#[must_use]
54pub fn effects_queue_enqueued() -> u64 {
55 EFFECTS_QUEUE_ENQUEUED.load(Ordering::Relaxed)
56}
57
58#[must_use]
60pub fn effects_queue_processed() -> u64 {
61 EFFECTS_QUEUE_PROCESSED.load(Ordering::Relaxed)
62}
63
64#[must_use]
66pub fn effects_queue_dropped() -> u64 {
67 EFFECTS_QUEUE_DROPPED.load(Ordering::Relaxed)
68}
69
70#[must_use]
72pub fn effects_queue_high_water() -> u64 {
73 EFFECTS_QUEUE_HIGH_WATER.load(Ordering::Relaxed)
74}
75
76pub fn record_queue_enqueue(current_depth: u64) {
78 EFFECTS_QUEUE_ENQUEUED.fetch_add(1, Ordering::Relaxed);
79 let mut prev = EFFECTS_QUEUE_HIGH_WATER.load(Ordering::Relaxed);
81 while current_depth > prev {
82 match EFFECTS_QUEUE_HIGH_WATER.compare_exchange_weak(
83 prev,
84 current_depth,
85 Ordering::Relaxed,
86 Ordering::Relaxed,
87 ) {
88 Ok(_) => break,
89 Err(actual) => prev = actual,
90 }
91 }
92}
93
94pub fn record_queue_processed() {
96 EFFECTS_QUEUE_PROCESSED.fetch_add(1, Ordering::Relaxed);
97}
98
99pub fn record_queue_drop(reason: &str) {
101 EFFECTS_QUEUE_DROPPED.fetch_add(1, Ordering::Relaxed);
102 tracing::warn!(
103 target: "ftui.effect",
104 reason = reason,
105 monotonic.counter.effects_queue_dropped_total = 1_u64,
106 "effect queue task dropped"
107 );
108}
109
110#[derive(Debug, Clone, Copy, PartialEq, Eq)]
112pub struct QueueTelemetry {
113 pub enqueued: u64,
115 pub processed: u64,
117 pub dropped: u64,
119 pub high_water: u64,
121 pub in_flight: u64,
123}
124
125#[must_use]
127pub fn queue_telemetry() -> QueueTelemetry {
128 let enqueued = effects_queue_enqueued();
129 let processed = effects_queue_processed();
130 let dropped = effects_queue_dropped();
131 let in_flight = enqueued.saturating_sub(processed).saturating_sub(dropped);
132 QueueTelemetry {
133 enqueued,
134 processed,
135 dropped,
136 high_water: effects_queue_high_water(),
137 in_flight,
138 }
139}
140
141static SUBSCRIPTION_STARTS_TOTAL: AtomicU64 = AtomicU64::new(0);
149static SUBSCRIPTION_STOPS_TOTAL: AtomicU64 = AtomicU64::new(0);
150static SUBSCRIPTION_PANICS_TOTAL: AtomicU64 = AtomicU64::new(0);
151static RECONCILE_COUNT: AtomicU64 = AtomicU64::new(0);
152static RECONCILE_DURATION_US_TOTAL: AtomicU64 = AtomicU64::new(0);
153static SHUTDOWN_DURATION_US_LAST: AtomicU64 = AtomicU64::new(0);
154static SHUTDOWN_TIMED_OUT_TOTAL: AtomicU64 = AtomicU64::new(0);
155
156#[must_use]
158pub fn subscription_starts_total() -> u64 {
159 SUBSCRIPTION_STARTS_TOTAL.load(Ordering::Relaxed)
160}
161
162#[must_use]
164pub fn subscription_stops_total() -> u64 {
165 SUBSCRIPTION_STOPS_TOTAL.load(Ordering::Relaxed)
166}
167
168#[must_use]
170pub fn subscription_panics_total() -> u64 {
171 SUBSCRIPTION_PANICS_TOTAL.load(Ordering::Relaxed)
172}
173
174#[must_use]
176pub fn reconcile_count() -> u64 {
177 RECONCILE_COUNT.load(Ordering::Relaxed)
178}
179
180#[must_use]
182pub fn reconcile_duration_us_total() -> u64 {
183 RECONCILE_DURATION_US_TOTAL.load(Ordering::Relaxed)
184}
185
186#[must_use]
188pub fn shutdown_duration_us_last() -> u64 {
189 SHUTDOWN_DURATION_US_LAST.load(Ordering::Relaxed)
190}
191
192#[must_use]
194pub fn shutdown_timed_out_total() -> u64 {
195 SHUTDOWN_TIMED_OUT_TOTAL.load(Ordering::Relaxed)
196}
197
198pub fn record_dynamics_sub_start() {
200 SUBSCRIPTION_STARTS_TOTAL.fetch_add(1, Ordering::Relaxed);
201}
202
203pub fn record_dynamics_sub_stop() {
205 SUBSCRIPTION_STOPS_TOTAL.fetch_add(1, Ordering::Relaxed);
206}
207
208pub fn record_dynamics_sub_panic() {
210 SUBSCRIPTION_PANICS_TOTAL.fetch_add(1, Ordering::Relaxed);
211}
212
213pub fn record_dynamics_reconcile(duration_us: u64) {
215 RECONCILE_COUNT.fetch_add(1, Ordering::Relaxed);
216 RECONCILE_DURATION_US_TOTAL.fetch_add(duration_us, Ordering::Relaxed);
217}
218
219pub fn record_dynamics_shutdown(duration_us: u64, timed_out: u64) {
221 SHUTDOWN_DURATION_US_LAST.store(duration_us, Ordering::Relaxed);
222 SHUTDOWN_TIMED_OUT_TOTAL.fetch_add(timed_out, Ordering::Relaxed);
223}
224
225#[derive(Debug, Clone, Copy, PartialEq, Eq)]
227pub struct RuntimeDynamics {
228 pub sub_starts: u64,
230 pub sub_stops: u64,
232 pub sub_panics: u64,
234 pub sub_active_estimate: u64,
236 pub reconciles: u64,
238 pub reconcile_avg_us: u64,
240 pub shutdown_last_us: u64,
242 pub shutdown_timeouts: u64,
244}
245
246#[must_use]
248pub fn runtime_dynamics() -> RuntimeDynamics {
249 let sub_starts = subscription_starts_total();
250 let sub_stops = subscription_stops_total();
251 let reconciles = reconcile_count();
252 let reconcile_total_us = reconcile_duration_us_total();
253 RuntimeDynamics {
254 sub_starts,
255 sub_stops,
256 sub_panics: subscription_panics_total(),
257 sub_active_estimate: sub_starts.saturating_sub(sub_stops),
258 reconciles,
259 reconcile_avg_us: reconcile_total_us.checked_div(reconciles).unwrap_or(0),
260 shutdown_last_us: shutdown_duration_us_last(),
261 shutdown_timeouts: shutdown_timed_out_total(),
262 }
263}
264
265pub fn trace_command_effect<F, R>(command_type: &str, f: F) -> R
274where
275 F: FnOnce() -> R,
276{
277 EFFECTS_COMMAND_TOTAL.fetch_add(1, Ordering::Relaxed);
278
279 let start = Instant::now();
280 let _span = tracing::debug_span!(
281 "effect.command",
282 command_type = %command_type,
283 duration_us = tracing::field::Empty,
284 result = tracing::field::Empty,
285 )
286 .entered();
287
288 tracing::debug!(
289 target: "ftui.effect",
290 command_type = %command_type,
291 "command effect started"
292 );
293
294 let result = f();
295 let duration_us = start.elapsed().as_micros() as u64;
296
297 tracing::debug!(
298 target: "ftui.effect",
299 command_type = %command_type,
300 duration_us = duration_us,
301 effect_duration_us = duration_us,
302 "command effect completed"
303 );
304
305 result
306}
307
308pub fn record_command_effect(command_type: &str, duration_us: u64) {
310 EFFECTS_COMMAND_TOTAL.fetch_add(1, Ordering::Relaxed);
311
312 let _span = tracing::debug_span!(
313 "effect.command",
314 command_type = %command_type,
315 duration_us = duration_us,
316 result = "ok",
317 )
318 .entered();
319
320 tracing::debug!(
321 target: "ftui.effect",
322 command_type = %command_type,
323 duration_us = duration_us,
324 effect_duration_us = duration_us,
325 "command effect recorded"
326 );
327}
328
329pub fn record_subscription_start(sub_type: &str, sub_id: u64) {
335 EFFECTS_SUBSCRIPTION_TOTAL.fetch_add(1, Ordering::Relaxed);
336
337 let _span = tracing::debug_span!(
338 "effect.subscription",
339 sub_type = %sub_type,
340 event_count = 0u64,
341 active = true,
342 )
343 .entered();
344
345 tracing::debug!(
346 target: "ftui.effect",
347 sub_type = %sub_type,
348 sub_id = sub_id,
349 active = true,
350 "subscription started"
351 );
352}
353
354pub fn record_subscription_stop(sub_type: &str, sub_id: u64, event_count: u64) {
356 let _span = tracing::debug_span!(
357 "effect.subscription",
358 sub_type = %sub_type,
359 event_count = event_count,
360 active = false,
361 )
362 .entered();
363
364 tracing::debug!(
365 target: "ftui.effect",
366 sub_type = %sub_type,
367 sub_id = sub_id,
368 event_count = event_count,
369 active = false,
370 "subscription stopped"
371 );
372}
373
374pub fn warn_effect_timeout(effect_type: &str, deadline_us: u64) {
376 tracing::warn!(
377 target: "ftui.effect",
378 effect_type = %effect_type,
379 deadline_us = deadline_us,
380 "effect timeout exceeded deadline"
381 );
382}
383
384pub fn error_effect_panic(effect_type: &str, panic_msg: &str) {
386 tracing::error!(
387 target: "ftui.effect",
388 effect_type = %effect_type,
389 panic_msg = %panic_msg,
390 "effect panicked during execution"
391 );
392}
393
394#[cfg(test)]
399mod tests {
400 use super::*;
401 use std::collections::HashMap;
402 use std::sync::{Arc, Mutex};
403 use tracing_subscriber::layer::SubscriberExt;
404 use tracing_subscriber::registry::LookupSpan;
405
406 #[derive(Debug, Clone)]
408 #[allow(dead_code)]
409 struct CapturedSpan {
410 name: String,
411 fields: HashMap<String, String>,
412 }
413
414 #[derive(Debug, Clone)]
415 #[allow(dead_code)]
416 struct CapturedEvent {
417 level: tracing::Level,
418 target: String,
419 fields: HashMap<String, String>,
420 }
421
422 struct SpanCapture {
423 spans: Arc<Mutex<Vec<CapturedSpan>>>,
424 events: Arc<Mutex<Vec<CapturedEvent>>>,
425 }
426
427 impl SpanCapture {
428 fn new() -> (Self, CaptureHandle) {
429 let spans = Arc::new(Mutex::new(Vec::new()));
430 let events = Arc::new(Mutex::new(Vec::new()));
431 let handle = CaptureHandle {
432 spans: spans.clone(),
433 events: events.clone(),
434 };
435 (Self { spans, events }, handle)
436 }
437 }
438
439 struct CaptureHandle {
440 spans: Arc<Mutex<Vec<CapturedSpan>>>,
441 events: Arc<Mutex<Vec<CapturedEvent>>>,
442 }
443
444 impl CaptureHandle {
445 fn spans(&self) -> Vec<CapturedSpan> {
446 self.spans.lock().unwrap().clone()
447 }
448
449 fn events(&self) -> Vec<CapturedEvent> {
450 self.events.lock().unwrap().clone()
451 }
452 }
453
454 struct FieldVisitor(Vec<(String, String)>);
455
456 impl tracing::field::Visit for FieldVisitor {
457 fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
458 self.0
459 .push((field.name().to_string(), format!("{value:?}")));
460 }
461 fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
462 self.0.push((field.name().to_string(), value.to_string()));
463 }
464 fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
465 self.0.push((field.name().to_string(), value.to_string()));
466 }
467 fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
468 self.0.push((field.name().to_string(), value.to_string()));
469 }
470 fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
471 self.0.push((field.name().to_string(), value.to_string()));
472 }
473 }
474
475 impl<S> tracing_subscriber::Layer<S> for SpanCapture
476 where
477 S: tracing::Subscriber + for<'a> LookupSpan<'a>,
478 {
479 fn on_new_span(
480 &self,
481 attrs: &tracing::span::Attributes<'_>,
482 _id: &tracing::span::Id,
483 _ctx: tracing_subscriber::layer::Context<'_, S>,
484 ) {
485 let mut visitor = FieldVisitor(Vec::new());
486 attrs.record(&mut visitor);
487 let mut fields: HashMap<String, String> = visitor.0.into_iter().collect();
488 for field in attrs.metadata().fields() {
489 fields.entry(field.name().to_string()).or_default();
490 }
491 self.spans.lock().unwrap().push(CapturedSpan {
492 name: attrs.metadata().name().to_string(),
493 fields,
494 });
495 }
496
497 fn on_event(
498 &self,
499 event: &tracing::Event<'_>,
500 _ctx: tracing_subscriber::layer::Context<'_, S>,
501 ) {
502 let mut visitor = FieldVisitor(Vec::new());
503 event.record(&mut visitor);
504 let fields: HashMap<String, String> = visitor.0.into_iter().collect();
505 self.events.lock().unwrap().push(CapturedEvent {
506 level: *event.metadata().level(),
507 target: event.metadata().target().to_string(),
508 fields,
509 });
510 }
511 }
512
513 fn with_captured_tracing<F>(f: F) -> CaptureHandle
514 where
515 F: FnOnce(),
516 {
517 let (layer, handle) = SpanCapture::new();
518 let subscriber = tracing_subscriber::registry().with(layer);
519 tracing::subscriber::with_default(subscriber, f);
520 handle
521 }
522
523 #[test]
528 fn trace_command_effect_emits_span() {
529 let handle = with_captured_tracing(|| {
530 trace_command_effect("task", || 42);
531 });
532
533 let spans = handle.spans();
534 let cmd_spans: Vec<_> = spans
535 .iter()
536 .filter(|s| s.name == "effect.command")
537 .collect();
538 assert!(!cmd_spans.is_empty(), "expected effect.command span");
539 assert!(cmd_spans[0].fields.contains_key("command_type"));
540 }
541
542 #[test]
543 fn trace_command_effect_returns_value() {
544 let result = trace_command_effect("test", || 42);
545 assert_eq!(result, 42);
546 }
547
548 #[test]
549 fn trace_command_effect_debug_events() {
550 let handle = with_captured_tracing(|| {
551 trace_command_effect("file_io", || {});
552 });
553
554 let events = handle.events();
555 let start_events: Vec<_> = events
556 .iter()
557 .filter(|e| {
558 e.target == "ftui.effect"
559 && e.fields
560 .get("message")
561 .is_some_and(|m| m.contains("started"))
562 })
563 .collect();
564 assert!(!start_events.is_empty(), "expected start event");
565
566 let complete_events: Vec<_> = events
567 .iter()
568 .filter(|e| {
569 e.target == "ftui.effect"
570 && e.fields
571 .get("message")
572 .is_some_and(|m| m.contains("completed"))
573 })
574 .collect();
575 assert!(!complete_events.is_empty(), "expected complete event");
576
577 let evt = &complete_events[0];
578 assert!(
579 evt.fields.contains_key("duration_us"),
580 "missing duration_us"
581 );
582 assert!(
583 evt.fields.contains_key("effect_duration_us"),
584 "missing effect_duration_us histogram"
585 );
586 }
587
588 #[test]
589 fn record_command_effect_emits_span() {
590 let handle = with_captured_tracing(|| {
591 record_command_effect("clipboard", 150);
592 });
593
594 let spans = handle.spans();
595 let cmd_spans: Vec<_> = spans
596 .iter()
597 .filter(|s| s.name == "effect.command")
598 .collect();
599 assert!(!cmd_spans.is_empty());
600 assert_eq!(
601 cmd_spans[0].fields.get("command_type").unwrap(),
602 "clipboard"
603 );
604 }
605
606 #[test]
611 fn record_subscription_start_emits_span() {
612 let handle = with_captured_tracing(|| {
613 record_subscription_start("timer", 42);
614 });
615
616 let spans = handle.spans();
617 let sub_spans: Vec<_> = spans
618 .iter()
619 .filter(|s| s.name == "effect.subscription")
620 .collect();
621 assert!(!sub_spans.is_empty(), "expected effect.subscription span");
622 assert!(sub_spans[0].fields.contains_key("sub_type"));
623 assert!(sub_spans[0].fields.contains_key("active"));
624 }
625
626 #[test]
627 fn record_subscription_stop_emits_span() {
628 let handle = with_captured_tracing(|| {
629 record_subscription_stop("keyboard", 7, 100);
630 });
631
632 let spans = handle.spans();
633 let sub_spans: Vec<_> = spans
634 .iter()
635 .filter(|s| s.name == "effect.subscription")
636 .collect();
637 assert!(!sub_spans.is_empty());
638 assert!(sub_spans[0].fields.contains_key("event_count"));
639 }
640
641 #[test]
646 fn warn_effect_timeout_emits_warn_event() {
647 let handle = with_captured_tracing(|| {
648 warn_effect_timeout("task", 500_000);
649 });
650
651 let events = handle.events();
652 let warn_events: Vec<_> = events
653 .iter()
654 .filter(|e| e.level == tracing::Level::WARN && e.target == "ftui.effect")
655 .collect();
656 assert!(!warn_events.is_empty(), "expected WARN event for timeout");
657 }
658
659 #[test]
660 fn error_effect_panic_emits_error_event() {
661 let handle = with_captured_tracing(|| {
662 error_effect_panic("subscription", "thread panicked");
663 });
664
665 let events = handle.events();
666 let error_events: Vec<_> = events
667 .iter()
668 .filter(|e| e.level == tracing::Level::ERROR && e.target == "ftui.effect")
669 .collect();
670 assert!(!error_events.is_empty(), "expected ERROR event for panic");
671 }
672
673 #[test]
678 fn counter_accessors_callable() {
679 let cmd = effects_command_total();
680 let sub = effects_subscription_total();
681 let total = effects_executed_total();
682 assert_eq!(total, cmd + sub);
683 }
684
685 #[test]
686 fn counters_increment_on_command() {
687 let before = effects_command_total();
688 trace_command_effect("test", || {});
689 let after = effects_command_total();
690 assert!(
691 after > before,
692 "command counter should increment: {before} → {after}"
693 );
694 }
695
696 #[test]
697 fn counters_increment_on_subscription() {
698 let before = effects_subscription_total();
699 record_subscription_start("test", 1);
700 let after = effects_subscription_total();
701 assert!(
702 after > before,
703 "subscription counter should increment: {before} → {after}"
704 );
705 }
706
707 #[test]
712 fn queue_enqueue_increments_counter() {
713 let before = effects_queue_enqueued();
714 record_queue_enqueue(1);
715 let after = effects_queue_enqueued();
716 assert!(after > before, "enqueued counter should increment");
717 }
718
719 #[test]
720 fn queue_processed_increments_counter() {
721 let before = effects_queue_processed();
722 record_queue_processed();
723 let after = effects_queue_processed();
724 assert!(after > before, "processed counter should increment");
725 }
726
727 #[test]
728 fn queue_drop_increments_counter() {
729 let before = effects_queue_dropped();
730 record_queue_drop("test");
731 let after = effects_queue_dropped();
732 assert!(after > before, "dropped counter should increment");
733 }
734
735 #[test]
736 fn queue_high_water_ratchets_upward() {
737 let before = effects_queue_high_water();
738 let new_mark = before + 100;
739 record_queue_enqueue(new_mark);
740 assert!(
741 effects_queue_high_water() >= new_mark,
742 "high-water should ratchet to at least {new_mark}"
743 );
744 record_queue_enqueue(1);
746 assert!(
747 effects_queue_high_water() >= new_mark,
748 "high-water should not decrease"
749 );
750 }
751
752 #[test]
753 fn queue_telemetry_snapshot_consistent() {
754 let snap = queue_telemetry();
755 assert_eq!(
757 snap.in_flight,
758 snap.enqueued
759 .saturating_sub(snap.processed)
760 .saturating_sub(snap.dropped),
761 "in_flight should be enqueued - processed - dropped"
762 );
763 }
764
765 #[test]
770 fn dynamics_sub_start_increments() {
771 let before = subscription_starts_total();
772 record_dynamics_sub_start();
773 let after = subscription_starts_total();
774 assert!(after > before);
775 }
776
777 #[test]
778 fn dynamics_sub_stop_increments() {
779 let before = subscription_stops_total();
780 record_dynamics_sub_stop();
781 let after = subscription_stops_total();
782 assert!(after > before);
783 }
784
785 #[test]
786 fn dynamics_sub_panic_increments() {
787 let before = subscription_panics_total();
788 record_dynamics_sub_panic();
789 let after = subscription_panics_total();
790 assert!(after > before);
791 }
792
793 #[test]
794 fn dynamics_reconcile_records_count_and_duration() {
795 let before_count = reconcile_count();
796 let before_dur = reconcile_duration_us_total();
797 record_dynamics_reconcile(500);
798 assert!(reconcile_count() > before_count);
799 assert!(reconcile_duration_us_total() >= before_dur + 500);
800 }
801
802 #[test]
803 fn dynamics_shutdown_records_duration() {
804 record_dynamics_shutdown(1234, 2);
805 assert_eq!(shutdown_duration_us_last(), 1234);
806 let timeouts = shutdown_timed_out_total();
807 assert!(timeouts >= 2);
808 }
809
810 #[test]
811 fn dynamics_snapshot_consistent() {
812 let snap = runtime_dynamics();
813 assert_eq!(
814 snap.sub_active_estimate,
815 snap.sub_starts.saturating_sub(snap.sub_stops),
816 "active estimate = starts - stops"
817 );
818 if snap.reconciles > 0 {
819 assert!(
820 snap.reconcile_avg_us > 0 || reconcile_duration_us_total() == 0,
821 "avg should be > 0 when reconciles happened with non-zero duration"
822 );
823 }
824 }
825}