Skip to main content

ftui_runtime/
effect_system.rs

1#![forbid(unsafe_code)]
2
3//! Effect system observability and Cx-aware execution helpers.
4//!
5//! This module provides:
6//!
7//! - **Cx-aware task execution**: [`run_task_with_cx`] wraps a closure with
8//!   a [`Cx`] context for cooperative cancellation and deadline enforcement.
9//! - **Tracing spans**: `effect.command` and `effect.subscription` spans
10//!   with structured fields for observability dashboards.
11//! - **Metrics counters**: `effects_executed_total` (by type) and
12//!   `effect_duration_us` histogram approximation.
13//!
14//! # bd-37a.6: Command/Subscription effect system with Cx capability threading
15
16use std::sync::atomic::{AtomicU64, Ordering};
17use web_time::Instant;
18
19// ---------------------------------------------------------------------------
20// Monotonic counters
21// ---------------------------------------------------------------------------
22
23static EFFECTS_COMMAND_TOTAL: AtomicU64 = AtomicU64::new(0);
24static EFFECTS_SUBSCRIPTION_TOTAL: AtomicU64 = AtomicU64::new(0);
25static EFFECTS_QUEUE_ENQUEUED: AtomicU64 = AtomicU64::new(0);
26static EFFECTS_QUEUE_PROCESSED: AtomicU64 = AtomicU64::new(0);
27static EFFECTS_QUEUE_DROPPED: AtomicU64 = AtomicU64::new(0);
28static EFFECTS_QUEUE_HIGH_WATER: AtomicU64 = AtomicU64::new(0);
29
30/// Total command effects executed (monotonic counter).
31#[must_use]
32pub fn effects_command_total() -> u64 {
33    EFFECTS_COMMAND_TOTAL.load(Ordering::Relaxed)
34}
35
36/// Total subscription effects started (monotonic counter).
37#[must_use]
38pub fn effects_subscription_total() -> u64 {
39    EFFECTS_SUBSCRIPTION_TOTAL.load(Ordering::Relaxed)
40}
41
42/// Combined total of all effects executed.
43#[must_use]
44pub fn effects_executed_total() -> u64 {
45    effects_command_total() + effects_subscription_total()
46}
47
48// ---------------------------------------------------------------------------
49// Queue telemetry (bd-2zd0a)
50// ---------------------------------------------------------------------------
51
52/// Total tasks enqueued to the effect queue (monotonic counter).
53#[must_use]
54pub fn effects_queue_enqueued() -> u64 {
55    EFFECTS_QUEUE_ENQUEUED.load(Ordering::Relaxed)
56}
57
58/// Total tasks processed by the effect queue (monotonic counter).
59#[must_use]
60pub fn effects_queue_processed() -> u64 {
61    EFFECTS_QUEUE_PROCESSED.load(Ordering::Relaxed)
62}
63
64/// Total tasks dropped due to backpressure or shutdown (monotonic counter).
65#[must_use]
66pub fn effects_queue_dropped() -> u64 {
67    EFFECTS_QUEUE_DROPPED.load(Ordering::Relaxed)
68}
69
70/// High-water mark: maximum queue depth observed (ratchet — only increases).
71#[must_use]
72pub fn effects_queue_high_water() -> u64 {
73    EFFECTS_QUEUE_HIGH_WATER.load(Ordering::Relaxed)
74}
75
76/// Record a task enqueue, updating counters and high-water mark.
77pub fn record_queue_enqueue(current_depth: u64) {
78    EFFECTS_QUEUE_ENQUEUED.fetch_add(1, Ordering::Relaxed);
79    // Ratchet high-water mark upward.
80    let mut prev = EFFECTS_QUEUE_HIGH_WATER.load(Ordering::Relaxed);
81    while current_depth > prev {
82        match EFFECTS_QUEUE_HIGH_WATER.compare_exchange_weak(
83            prev,
84            current_depth,
85            Ordering::Relaxed,
86            Ordering::Relaxed,
87        ) {
88            Ok(_) => break,
89            Err(actual) => prev = actual,
90        }
91    }
92}
93
94/// Record a task processed by the effect queue.
95pub fn record_queue_processed() {
96    EFFECTS_QUEUE_PROCESSED.fetch_add(1, Ordering::Relaxed);
97}
98
99/// Record a task dropped due to backpressure or shutdown.
100pub fn record_queue_drop(reason: &str) {
101    EFFECTS_QUEUE_DROPPED.fetch_add(1, Ordering::Relaxed);
102    tracing::warn!(
103        target: "ftui.effect",
104        reason = reason,
105        monotonic.counter.effects_queue_dropped_total = 1_u64,
106        "effect queue task dropped"
107    );
108}
109
110/// Snapshot of queue telemetry for operator dashboards.
111#[derive(Debug, Clone, Copy, PartialEq, Eq)]
112pub struct QueueTelemetry {
113    /// Total tasks enqueued (monotonic).
114    pub enqueued: u64,
115    /// Total tasks processed (monotonic).
116    pub processed: u64,
117    /// Total tasks dropped (monotonic).
118    pub dropped: u64,
119    /// Maximum queue depth observed.
120    pub high_water: u64,
121    /// Current in-flight: enqueued - processed - dropped.
122    pub in_flight: u64,
123}
124
125/// Snapshot the current queue telemetry counters.
126#[must_use]
127pub fn queue_telemetry() -> QueueTelemetry {
128    let enqueued = effects_queue_enqueued();
129    let processed = effects_queue_processed();
130    let dropped = effects_queue_dropped();
131    let in_flight = enqueued.saturating_sub(processed).saturating_sub(dropped);
132    QueueTelemetry {
133        enqueued,
134        processed,
135        dropped,
136        high_water: effects_queue_high_water(),
137        in_flight,
138    }
139}
140
141// ---------------------------------------------------------------------------
142// Runtime dynamics instrumentation (bd-4flji)
143//
144// These metrics track the leading indicators of user-visible pain:
145// subscription churn, shutdown latency, and reconcile frequency.
146// ---------------------------------------------------------------------------
147
148static SUBSCRIPTION_STARTS_TOTAL: AtomicU64 = AtomicU64::new(0);
149static SUBSCRIPTION_STOPS_TOTAL: AtomicU64 = AtomicU64::new(0);
150static SUBSCRIPTION_PANICS_TOTAL: AtomicU64 = AtomicU64::new(0);
151static RECONCILE_COUNT: AtomicU64 = AtomicU64::new(0);
152static RECONCILE_DURATION_US_TOTAL: AtomicU64 = AtomicU64::new(0);
153static SHUTDOWN_DURATION_US_LAST: AtomicU64 = AtomicU64::new(0);
154static SHUTDOWN_TIMED_OUT_TOTAL: AtomicU64 = AtomicU64::new(0);
155
156/// Total subscription starts (monotonic counter).
157#[must_use]
158pub fn subscription_starts_total() -> u64 {
159    SUBSCRIPTION_STARTS_TOTAL.load(Ordering::Relaxed)
160}
161
162/// Total subscription stops (monotonic counter).
163#[must_use]
164pub fn subscription_stops_total() -> u64 {
165    SUBSCRIPTION_STOPS_TOTAL.load(Ordering::Relaxed)
166}
167
168/// Total subscription panics caught (monotonic counter).
169#[must_use]
170pub fn subscription_panics_total() -> u64 {
171    SUBSCRIPTION_PANICS_TOTAL.load(Ordering::Relaxed)
172}
173
174/// Total reconcile operations (monotonic counter).
175#[must_use]
176pub fn reconcile_count() -> u64 {
177    RECONCILE_COUNT.load(Ordering::Relaxed)
178}
179
180/// Cumulative reconcile duration in microseconds.
181#[must_use]
182pub fn reconcile_duration_us_total() -> u64 {
183    RECONCILE_DURATION_US_TOTAL.load(Ordering::Relaxed)
184}
185
186/// Most recent shutdown duration in microseconds (0 = no shutdown yet).
187#[must_use]
188pub fn shutdown_duration_us_last() -> u64 {
189    SHUTDOWN_DURATION_US_LAST.load(Ordering::Relaxed)
190}
191
192/// Total subscription join timeouts during shutdown (monotonic counter).
193#[must_use]
194pub fn shutdown_timed_out_total() -> u64 {
195    SHUTDOWN_TIMED_OUT_TOTAL.load(Ordering::Relaxed)
196}
197
198/// Record a subscription start event.
199pub fn record_dynamics_sub_start() {
200    SUBSCRIPTION_STARTS_TOTAL.fetch_add(1, Ordering::Relaxed);
201}
202
203/// Record a subscription stop event.
204pub fn record_dynamics_sub_stop() {
205    SUBSCRIPTION_STOPS_TOTAL.fetch_add(1, Ordering::Relaxed);
206}
207
208/// Record a subscription panic event.
209pub fn record_dynamics_sub_panic() {
210    SUBSCRIPTION_PANICS_TOTAL.fetch_add(1, Ordering::Relaxed);
211}
212
213/// Record a reconcile operation with its duration.
214pub fn record_dynamics_reconcile(duration_us: u64) {
215    RECONCILE_COUNT.fetch_add(1, Ordering::Relaxed);
216    RECONCILE_DURATION_US_TOTAL.fetch_add(duration_us, Ordering::Relaxed);
217}
218
219/// Record a shutdown completion with its duration and timeout count.
220pub fn record_dynamics_shutdown(duration_us: u64, timed_out: u64) {
221    SHUTDOWN_DURATION_US_LAST.store(duration_us, Ordering::Relaxed);
222    SHUTDOWN_TIMED_OUT_TOTAL.fetch_add(timed_out, Ordering::Relaxed);
223}
224
225/// Snapshot of runtime dynamics for operator dashboards and performance analysis.
226#[derive(Debug, Clone, Copy, PartialEq, Eq)]
227pub struct RuntimeDynamics {
228    /// Total subscription starts.
229    pub sub_starts: u64,
230    /// Total subscription stops.
231    pub sub_stops: u64,
232    /// Total subscription panics caught.
233    pub sub_panics: u64,
234    /// Current subscription churn: starts - stops.
235    pub sub_active_estimate: u64,
236    /// Total reconcile operations.
237    pub reconciles: u64,
238    /// Average reconcile duration in microseconds (0 if no reconciles yet).
239    pub reconcile_avg_us: u64,
240    /// Most recent shutdown duration in microseconds.
241    pub shutdown_last_us: u64,
242    /// Total join timeouts during shutdowns.
243    pub shutdown_timeouts: u64,
244}
245
246/// Snapshot the current runtime dynamics counters.
247#[must_use]
248pub fn runtime_dynamics() -> RuntimeDynamics {
249    let sub_starts = subscription_starts_total();
250    let sub_stops = subscription_stops_total();
251    let reconciles = reconcile_count();
252    let reconcile_total_us = reconcile_duration_us_total();
253    RuntimeDynamics {
254        sub_starts,
255        sub_stops,
256        sub_panics: subscription_panics_total(),
257        sub_active_estimate: sub_starts.saturating_sub(sub_stops),
258        reconciles,
259        reconcile_avg_us: reconcile_total_us.checked_div(reconciles).unwrap_or(0),
260        shutdown_last_us: shutdown_duration_us_last(),
261        shutdown_timeouts: shutdown_timed_out_total(),
262    }
263}
264
265// ---------------------------------------------------------------------------
266// Command effect instrumentation
267// ---------------------------------------------------------------------------
268
269/// Execute a command effect with tracing instrumentation.
270///
271/// Wraps command execution with an `effect.command` span recording
272/// `command_type`, `duration_us`, and `result`.
273pub fn trace_command_effect<F, R>(command_type: &str, f: F) -> R
274where
275    F: FnOnce() -> R,
276{
277    EFFECTS_COMMAND_TOTAL.fetch_add(1, Ordering::Relaxed);
278
279    let start = Instant::now();
280    let _span = tracing::debug_span!(
281        "effect.command",
282        command_type = %command_type,
283        duration_us = tracing::field::Empty,
284        result = tracing::field::Empty,
285    )
286    .entered();
287
288    tracing::debug!(
289        target: "ftui.effect",
290        command_type = %command_type,
291        "command effect started"
292    );
293
294    let result = f();
295    let duration_us = start.elapsed().as_micros() as u64;
296
297    tracing::debug!(
298        target: "ftui.effect",
299        command_type = %command_type,
300        duration_us = duration_us,
301        effect_duration_us = duration_us,
302        "command effect completed"
303    );
304
305    result
306}
307
308/// Record a command effect execution without wrapping (for inline instrumentation).
309pub fn record_command_effect(command_type: &str, duration_us: u64) {
310    EFFECTS_COMMAND_TOTAL.fetch_add(1, Ordering::Relaxed);
311
312    let _span = tracing::debug_span!(
313        "effect.command",
314        command_type = %command_type,
315        duration_us = duration_us,
316        result = "ok",
317    )
318    .entered();
319
320    tracing::debug!(
321        target: "ftui.effect",
322        command_type = %command_type,
323        duration_us = duration_us,
324        effect_duration_us = duration_us,
325        "command effect recorded"
326    );
327}
328
329// ---------------------------------------------------------------------------
330// Subscription effect instrumentation
331// ---------------------------------------------------------------------------
332
333/// Record a subscription lifecycle event.
334pub fn record_subscription_start(sub_type: &str, sub_id: u64) {
335    EFFECTS_SUBSCRIPTION_TOTAL.fetch_add(1, Ordering::Relaxed);
336
337    let _span = tracing::debug_span!(
338        "effect.subscription",
339        sub_type = %sub_type,
340        event_count = 0u64,
341        active = true,
342    )
343    .entered();
344
345    tracing::debug!(
346        target: "ftui.effect",
347        sub_type = %sub_type,
348        sub_id = sub_id,
349        active = true,
350        "subscription started"
351    );
352}
353
354/// Record a subscription stop event.
355pub fn record_subscription_stop(sub_type: &str, sub_id: u64, event_count: u64) {
356    let _span = tracing::debug_span!(
357        "effect.subscription",
358        sub_type = %sub_type,
359        event_count = event_count,
360        active = false,
361    )
362    .entered();
363
364    tracing::debug!(
365        target: "ftui.effect",
366        sub_type = %sub_type,
367        sub_id = sub_id,
368        event_count = event_count,
369        active = false,
370        "subscription stopped"
371    );
372}
373
374/// Record an effect timeout warning.
375pub fn warn_effect_timeout(effect_type: &str, deadline_us: u64) {
376    tracing::warn!(
377        target: "ftui.effect",
378        effect_type = %effect_type,
379        deadline_us = deadline_us,
380        "effect timeout exceeded deadline"
381    );
382}
383
384/// Record an effect panic error.
385pub fn error_effect_panic(effect_type: &str, panic_msg: &str) {
386    tracing::error!(
387        target: "ftui.effect",
388        effect_type = %effect_type,
389        panic_msg = %panic_msg,
390        "effect panicked during execution"
391    );
392}
393
394// ============================================================================
395// Tests
396// ============================================================================
397
398#[cfg(test)]
399mod tests {
400    use super::*;
401    use std::collections::HashMap;
402    use std::sync::{Arc, Mutex};
403    use tracing_subscriber::layer::SubscriberExt;
404    use tracing_subscriber::registry::LookupSpan;
405
406    // Tracing capture infrastructure
407    #[derive(Debug, Clone)]
408    #[allow(dead_code)]
409    struct CapturedSpan {
410        name: String,
411        fields: HashMap<String, String>,
412    }
413
414    #[derive(Debug, Clone)]
415    #[allow(dead_code)]
416    struct CapturedEvent {
417        level: tracing::Level,
418        target: String,
419        fields: HashMap<String, String>,
420    }
421
422    struct SpanCapture {
423        spans: Arc<Mutex<Vec<CapturedSpan>>>,
424        events: Arc<Mutex<Vec<CapturedEvent>>>,
425    }
426
427    impl SpanCapture {
428        fn new() -> (Self, CaptureHandle) {
429            let spans = Arc::new(Mutex::new(Vec::new()));
430            let events = Arc::new(Mutex::new(Vec::new()));
431            let handle = CaptureHandle {
432                spans: spans.clone(),
433                events: events.clone(),
434            };
435            (Self { spans, events }, handle)
436        }
437    }
438
439    struct CaptureHandle {
440        spans: Arc<Mutex<Vec<CapturedSpan>>>,
441        events: Arc<Mutex<Vec<CapturedEvent>>>,
442    }
443
444    impl CaptureHandle {
445        fn spans(&self) -> Vec<CapturedSpan> {
446            self.spans.lock().unwrap().clone()
447        }
448
449        fn events(&self) -> Vec<CapturedEvent> {
450            self.events.lock().unwrap().clone()
451        }
452    }
453
454    struct FieldVisitor(Vec<(String, String)>);
455
456    impl tracing::field::Visit for FieldVisitor {
457        fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
458            self.0
459                .push((field.name().to_string(), format!("{value:?}")));
460        }
461        fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
462            self.0.push((field.name().to_string(), value.to_string()));
463        }
464        fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
465            self.0.push((field.name().to_string(), value.to_string()));
466        }
467        fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
468            self.0.push((field.name().to_string(), value.to_string()));
469        }
470        fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
471            self.0.push((field.name().to_string(), value.to_string()));
472        }
473    }
474
475    impl<S> tracing_subscriber::Layer<S> for SpanCapture
476    where
477        S: tracing::Subscriber + for<'a> LookupSpan<'a>,
478    {
479        fn on_new_span(
480            &self,
481            attrs: &tracing::span::Attributes<'_>,
482            _id: &tracing::span::Id,
483            _ctx: tracing_subscriber::layer::Context<'_, S>,
484        ) {
485            let mut visitor = FieldVisitor(Vec::new());
486            attrs.record(&mut visitor);
487            let mut fields: HashMap<String, String> = visitor.0.into_iter().collect();
488            for field in attrs.metadata().fields() {
489                fields.entry(field.name().to_string()).or_default();
490            }
491            self.spans.lock().unwrap().push(CapturedSpan {
492                name: attrs.metadata().name().to_string(),
493                fields,
494            });
495        }
496
497        fn on_event(
498            &self,
499            event: &tracing::Event<'_>,
500            _ctx: tracing_subscriber::layer::Context<'_, S>,
501        ) {
502            let mut visitor = FieldVisitor(Vec::new());
503            event.record(&mut visitor);
504            let fields: HashMap<String, String> = visitor.0.into_iter().collect();
505            self.events.lock().unwrap().push(CapturedEvent {
506                level: *event.metadata().level(),
507                target: event.metadata().target().to_string(),
508                fields,
509            });
510        }
511    }
512
513    fn with_captured_tracing<F>(f: F) -> CaptureHandle
514    where
515        F: FnOnce(),
516    {
517        let (layer, handle) = SpanCapture::new();
518        let subscriber = tracing_subscriber::registry().with(layer);
519        tracing::subscriber::with_default(subscriber, f);
520        handle
521    }
522
523    // =====================================================================
524    // Command effect tests
525    // =====================================================================
526
527    #[test]
528    fn trace_command_effect_emits_span() {
529        let handle = with_captured_tracing(|| {
530            trace_command_effect("task", || 42);
531        });
532
533        let spans = handle.spans();
534        let cmd_spans: Vec<_> = spans
535            .iter()
536            .filter(|s| s.name == "effect.command")
537            .collect();
538        assert!(!cmd_spans.is_empty(), "expected effect.command span");
539        assert!(cmd_spans[0].fields.contains_key("command_type"));
540    }
541
542    #[test]
543    fn trace_command_effect_returns_value() {
544        let result = trace_command_effect("test", || 42);
545        assert_eq!(result, 42);
546    }
547
548    #[test]
549    fn trace_command_effect_debug_events() {
550        let handle = with_captured_tracing(|| {
551            trace_command_effect("file_io", || {});
552        });
553
554        let events = handle.events();
555        let start_events: Vec<_> = events
556            .iter()
557            .filter(|e| {
558                e.target == "ftui.effect"
559                    && e.fields
560                        .get("message")
561                        .is_some_and(|m| m.contains("started"))
562            })
563            .collect();
564        assert!(!start_events.is_empty(), "expected start event");
565
566        let complete_events: Vec<_> = events
567            .iter()
568            .filter(|e| {
569                e.target == "ftui.effect"
570                    && e.fields
571                        .get("message")
572                        .is_some_and(|m| m.contains("completed"))
573            })
574            .collect();
575        assert!(!complete_events.is_empty(), "expected complete event");
576
577        let evt = &complete_events[0];
578        assert!(
579            evt.fields.contains_key("duration_us"),
580            "missing duration_us"
581        );
582        assert!(
583            evt.fields.contains_key("effect_duration_us"),
584            "missing effect_duration_us histogram"
585        );
586    }
587
588    #[test]
589    fn record_command_effect_emits_span() {
590        let handle = with_captured_tracing(|| {
591            record_command_effect("clipboard", 150);
592        });
593
594        let spans = handle.spans();
595        let cmd_spans: Vec<_> = spans
596            .iter()
597            .filter(|s| s.name == "effect.command")
598            .collect();
599        assert!(!cmd_spans.is_empty());
600        assert_eq!(
601            cmd_spans[0].fields.get("command_type").unwrap(),
602            "clipboard"
603        );
604    }
605
606    // =====================================================================
607    // Subscription effect tests
608    // =====================================================================
609
610    #[test]
611    fn record_subscription_start_emits_span() {
612        let handle = with_captured_tracing(|| {
613            record_subscription_start("timer", 42);
614        });
615
616        let spans = handle.spans();
617        let sub_spans: Vec<_> = spans
618            .iter()
619            .filter(|s| s.name == "effect.subscription")
620            .collect();
621        assert!(!sub_spans.is_empty(), "expected effect.subscription span");
622        assert!(sub_spans[0].fields.contains_key("sub_type"));
623        assert!(sub_spans[0].fields.contains_key("active"));
624    }
625
626    #[test]
627    fn record_subscription_stop_emits_span() {
628        let handle = with_captured_tracing(|| {
629            record_subscription_stop("keyboard", 7, 100);
630        });
631
632        let spans = handle.spans();
633        let sub_spans: Vec<_> = spans
634            .iter()
635            .filter(|s| s.name == "effect.subscription")
636            .collect();
637        assert!(!sub_spans.is_empty());
638        assert!(sub_spans[0].fields.contains_key("event_count"));
639    }
640
641    // =====================================================================
642    // Warning/error log tests
643    // =====================================================================
644
645    #[test]
646    fn warn_effect_timeout_emits_warn_event() {
647        let handle = with_captured_tracing(|| {
648            warn_effect_timeout("task", 500_000);
649        });
650
651        let events = handle.events();
652        let warn_events: Vec<_> = events
653            .iter()
654            .filter(|e| e.level == tracing::Level::WARN && e.target == "ftui.effect")
655            .collect();
656        assert!(!warn_events.is_empty(), "expected WARN event for timeout");
657    }
658
659    #[test]
660    fn error_effect_panic_emits_error_event() {
661        let handle = with_captured_tracing(|| {
662            error_effect_panic("subscription", "thread panicked");
663        });
664
665        let events = handle.events();
666        let error_events: Vec<_> = events
667            .iter()
668            .filter(|e| e.level == tracing::Level::ERROR && e.target == "ftui.effect")
669            .collect();
670        assert!(!error_events.is_empty(), "expected ERROR event for panic");
671    }
672
673    // =====================================================================
674    // Counter tests
675    // =====================================================================
676
677    #[test]
678    fn counter_accessors_callable() {
679        let cmd = effects_command_total();
680        let sub = effects_subscription_total();
681        let total = effects_executed_total();
682        assert_eq!(total, cmd + sub);
683    }
684
685    #[test]
686    fn counters_increment_on_command() {
687        let before = effects_command_total();
688        trace_command_effect("test", || {});
689        let after = effects_command_total();
690        assert!(
691            after > before,
692            "command counter should increment: {before} → {after}"
693        );
694    }
695
696    #[test]
697    fn counters_increment_on_subscription() {
698        let before = effects_subscription_total();
699        record_subscription_start("test", 1);
700        let after = effects_subscription_total();
701        assert!(
702            after > before,
703            "subscription counter should increment: {before} → {after}"
704        );
705    }
706
707    // =========================================================================
708    // Queue telemetry tests (bd-2zd0a)
709    // =========================================================================
710
711    #[test]
712    fn queue_enqueue_increments_counter() {
713        let before = effects_queue_enqueued();
714        record_queue_enqueue(1);
715        let after = effects_queue_enqueued();
716        assert!(after > before, "enqueued counter should increment");
717    }
718
719    #[test]
720    fn queue_processed_increments_counter() {
721        let before = effects_queue_processed();
722        record_queue_processed();
723        let after = effects_queue_processed();
724        assert!(after > before, "processed counter should increment");
725    }
726
727    #[test]
728    fn queue_drop_increments_counter() {
729        let before = effects_queue_dropped();
730        record_queue_drop("test");
731        let after = effects_queue_dropped();
732        assert!(after > before, "dropped counter should increment");
733    }
734
735    #[test]
736    fn queue_high_water_ratchets_upward() {
737        let before = effects_queue_high_water();
738        let new_mark = before + 100;
739        record_queue_enqueue(new_mark);
740        assert!(
741            effects_queue_high_water() >= new_mark,
742            "high-water should ratchet to at least {new_mark}"
743        );
744        // Lower value should NOT reduce the high-water mark
745        record_queue_enqueue(1);
746        assert!(
747            effects_queue_high_water() >= new_mark,
748            "high-water should not decrease"
749        );
750    }
751
752    #[test]
753    fn queue_telemetry_snapshot_consistent() {
754        let snap = queue_telemetry();
755        // in_flight = enqueued - processed - dropped, all saturating
756        assert_eq!(
757            snap.in_flight,
758            snap.enqueued
759                .saturating_sub(snap.processed)
760                .saturating_sub(snap.dropped),
761            "in_flight should be enqueued - processed - dropped"
762        );
763    }
764
765    // =========================================================================
766    // Runtime dynamics tests (bd-4flji)
767    // =========================================================================
768
769    #[test]
770    fn dynamics_sub_start_increments() {
771        let before = subscription_starts_total();
772        record_dynamics_sub_start();
773        let after = subscription_starts_total();
774        assert!(after > before);
775    }
776
777    #[test]
778    fn dynamics_sub_stop_increments() {
779        let before = subscription_stops_total();
780        record_dynamics_sub_stop();
781        let after = subscription_stops_total();
782        assert!(after > before);
783    }
784
785    #[test]
786    fn dynamics_sub_panic_increments() {
787        let before = subscription_panics_total();
788        record_dynamics_sub_panic();
789        let after = subscription_panics_total();
790        assert!(after > before);
791    }
792
793    #[test]
794    fn dynamics_reconcile_records_count_and_duration() {
795        let before_count = reconcile_count();
796        let before_dur = reconcile_duration_us_total();
797        record_dynamics_reconcile(500);
798        assert!(reconcile_count() > before_count);
799        assert!(reconcile_duration_us_total() >= before_dur + 500);
800    }
801
802    #[test]
803    fn dynamics_shutdown_records_duration() {
804        record_dynamics_shutdown(1234, 2);
805        assert_eq!(shutdown_duration_us_last(), 1234);
806        let timeouts = shutdown_timed_out_total();
807        assert!(timeouts >= 2);
808    }
809
810    #[test]
811    fn dynamics_snapshot_consistent() {
812        let snap = runtime_dynamics();
813        assert_eq!(
814            snap.sub_active_estimate,
815            snap.sub_starts.saturating_sub(snap.sub_stops),
816            "active estimate = starts - stops"
817        );
818        if snap.reconciles > 0 {
819            assert!(
820                snap.reconcile_avg_us > 0 || reconcile_duration_us_total() == 0,
821                "avg should be > 0 when reconciles happened with non-zero duration"
822            );
823        }
824    }
825}