epics-bridge-rs 0.18.2

EPICS protocol bridges: Record↔PVA (QSRV), CA gateway, pvalink, PVA gateway
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
//! Gateway statistics.
//!
//! Tracks runtime metrics and exposes them as PVs hosted by the gateway's
//! own shadow [`PvDatabase`]. Downstream clients can read these PVs to
//! monitor the gateway itself (`gateway:totalPvs`, `gateway:vcCount`, etc.).
//!
//! Corresponds to C++ `gateStat`.
//!
//! ## Exposed PVs
//!
//! All names use the configurable prefix (default `"gateway:"`).
//!
//! Native names:
//!
//! | PV | Type | Description |
//! |----|------|-------------|
//! | `<prefix>totalPvs` | Long | Total entries in the cache (all states) |
//! | `<prefix>upstreamCount` | Long | Active upstream subscriptions |
//! | `<prefix>connectingCount` | Long | PVs in Connecting state |
//! | `<prefix>activeCount` | Long | PVs in Active state |
//! | `<prefix>inactiveCount` | Long | PVs in Inactive state |
//! | `<prefix>deadCount` | Long | PVs in Dead state |
//! | `<prefix>eventRate` | Double | Events/sec averaged over stats interval |
//! | `<prefix>totalEvents` | Long | Cumulative event count |
//! | `<prefix>heartbeat` | Long | Incrementing heartbeat counter |
//! | `<prefix>putCount` | Long | Cumulative put count (for putlog) |
//! | `<prefix>readOnlyRejects` | Long | Puts rejected because read_only=true |
//! | `<prefix>perHostConnections` | Long | Distinct downstream client hosts |
//!
//! C++ ca-gateway compatibility aliases (B-G10) — kept so dashboards
//! and scripts written against the C source's `gateServer.cc:1903-1965`
//! names keep working against the Rust gateway:
//!
//! | PV | Type | Maps to |
//! |----|------|---------|
//! | `<prefix>vctotal` | Long | totalPvs (virtual-channel total) |
//! | `<prefix>pvtotal` | Long | totalPvs (real-PV total — same source as vctotal in C) |
//! | `<prefix>connected` | Long | active + inactive (upstream-alive) |
//! | `<prefix>active` | Long | activeCount |
//! | `<prefix>inactive` | Long | inactiveCount |
//! | `<prefix>unconnected` | Long | connecting + dead (upstream-not-alive) |
//! | `<prefix>dead` | Long | deadCount |
//! | `<prefix>connecting` | Long | connectingCount |
//! | `<prefix>disconnected` | Long | deadCount (alias — C source treats these as the same bucket) |
//! | `<prefix>clientEventRate` | Double | eventRate |
//!
//! RATE_STATS internals (B5) — tokio-model equivalents of the C++
//! ca-gateway `gateServer` RATE_STATS counters from `gateServer.cc`.
//! The C source increments these inside its event-driven main loop;
//! the Rust port increments atomic counters at the equivalent points
//! (events received from upstream, monitor posts fanned out
//! downstream, run-loop iterations):
//!
//! | PV | Type | Maps to |
//! |----|------|---------|
//! | `<prefix>fd` | Long | Current open file-descriptor count |
//! | `<prefix>clientEventCount` | Long | Cumulative upstream events received |
//! | `<prefix>postEventCount` | Long | Cumulative monitor posts fanned downstream |
//! | `<prefix>loopCount` | Long | Cumulative gateway run-loop iterations |

use std::collections::HashSet;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Instant;

use epics_base_rs::server::database::PvDatabase;
use epics_base_rs::types::EpicsValue;
use tokio::sync::{Mutex, RwLock};

use super::cache::PvCache;

/// Gateway runtime statistics.
pub struct Stats {
    prefix: String,
    /// Cumulative event count from upstream (incremented in cache updater).
    pub total_events: AtomicU64,
    /// Cumulative put count.
    pub put_count: AtomicU64,
    /// Puts rejected because gateway is in read-only mode.
    pub read_only_rejects: AtomicU64,
    /// Heartbeat counter.
    pub heartbeat: AtomicU64,
    /// B5 RATE_STATS: cumulative monitor posts fanned out to the
    /// downstream shadow database. Incremented once per
    /// `put_pv_and_post` in the upstream forwarding task. Mirrors C++
    /// ca-gateway `gateServer::postEventCount`.
    pub post_event_count: AtomicU64,
    /// B5 RATE_STATS: cumulative gateway run-loop iterations. The C++
    /// gateway increments this per fdManager event-loop pass; the
    /// tokio port has no single event loop, so it is incremented once
    /// per periodic maintenance tick (cleanup / stats / heartbeat
    /// timers each call `record_loop`). Mirrors `gateServer::loopCount`.
    pub loop_count: AtomicU64,
    /// Per-host connection set, kept behind a mutex for distinct counting.
    per_host: Mutex<HashSet<String>>,
    /// Last refresh timestamp for event rate calculation.
    last_refresh: Mutex<Instant>,
    /// Last total_events value at refresh time, for delta calculation.
    last_total_events: AtomicU64,
}

impl Stats {
    pub fn new(prefix: String) -> Self {
        Self {
            prefix,
            total_events: AtomicU64::new(0),
            put_count: AtomicU64::new(0),
            read_only_rejects: AtomicU64::new(0),
            heartbeat: AtomicU64::new(0),
            post_event_count: AtomicU64::new(0),
            loop_count: AtomicU64::new(0),
            per_host: Mutex::new(HashSet::new()),
            last_refresh: Mutex::new(Instant::now()),
            last_total_events: AtomicU64::new(0),
        }
    }

    /// Record an upstream event.
    pub fn record_event(&self) {
        self.total_events.fetch_add(1, Ordering::Relaxed);
    }

    /// Record a put operation.
    pub fn record_put(&self) {
        self.put_count.fetch_add(1, Ordering::Relaxed);
    }

    /// B5: record a monitor post fanned out to the downstream shadow
    /// database. Called once per `put_pv_and_post` in the upstream
    /// forwarding task.
    pub fn record_post_event(&self) {
        self.post_event_count.fetch_add(1, Ordering::Relaxed);
    }

    /// B5: record one gateway run-loop iteration. Called from each
    /// periodic maintenance tick (cleanup / stats / heartbeat).
    pub fn record_loop(&self) {
        self.loop_count.fetch_add(1, Ordering::Relaxed);
    }

    /// Record a put that was rejected by read-only mode.
    pub fn record_readonly_reject(&self) {
        self.read_only_rejects.fetch_add(1, Ordering::Relaxed);
    }

    /// Track a downstream client host (for per-host connection count).
    pub async fn record_host(&self, host: &str) {
        self.per_host.lock().await.insert(host.to_string());
    }

    /// Forget a downstream client host (on disconnect).
    pub async fn forget_host(&self, host: &str) {
        self.per_host.lock().await.remove(host);
    }

    /// Distinct downstream client host count.
    pub async fn host_count(&self) -> usize {
        self.per_host.lock().await.len()
    }

    /// Pre-register all stats PVs in the shadow database with placeholder values.
    /// Called once during gateway build.
    pub async fn publish_initial(&self, db: &PvDatabase) {
        let p = &self.prefix;
        if p.is_empty() {
            return;
        }

        for (suffix, init) in [
            ("totalPvs", EpicsValue::Long(0)),
            ("upstreamCount", EpicsValue::Long(0)),
            ("connectingCount", EpicsValue::Long(0)),
            ("activeCount", EpicsValue::Long(0)),
            ("inactiveCount", EpicsValue::Long(0)),
            ("deadCount", EpicsValue::Long(0)),
            ("eventRate", EpicsValue::Double(0.0)),
            ("totalEvents", EpicsValue::Long(0)),
            ("heartbeat", EpicsValue::Long(0)),
            ("putCount", EpicsValue::Long(0)),
            ("readOnlyRejects", EpicsValue::Long(0)),
            ("perHostConnections", EpicsValue::Long(0)),
            // B-G10: aliases matching C++ ca-gateway (gateServer.cc:
            // 1903-1965) so dashboards/scripts written against the C
            // names keep working. Connected = active + inactive
            // (both are "upstream is alive"); pvtotal/vctotal are
            // both alias for totalPvs in the C source.
            ("vctotal", EpicsValue::Long(0)),
            ("pvtotal", EpicsValue::Long(0)),
            ("connected", EpicsValue::Long(0)),
            ("active", EpicsValue::Long(0)),
            ("inactive", EpicsValue::Long(0)),
            ("unconnected", EpicsValue::Long(0)),
            ("dead", EpicsValue::Long(0)),
            ("connecting", EpicsValue::Long(0)),
            ("disconnected", EpicsValue::Long(0)),
            ("clientEventRate", EpicsValue::Double(0.0)),
            // B5: RATE_STATS internals — tokio-model equivalents of
            // the C++ ca-gateway gateServer counters.
            ("fd", EpicsValue::Long(0)),
            ("clientEventCount", EpicsValue::Long(0)),
            ("postEventCount", EpicsValue::Long(0)),
            ("loopCount", EpicsValue::Long(0)),
        ] {
            let pv = format!("{p}{suffix}");
            if let Err(e) = db.add_pv(&pv, init).await {
                tracing::warn!(
                    pv = %pv,
                    error = %e,
                    "ca_gateway stats: pre-register skipped (name already in use)"
                );
            }
        }
    }

    /// Refresh stats PVs in the database from current cache + counters.
    /// Called periodically by the stats timer in the main event loop.
    pub async fn refresh(
        &self,
        cache: &RwLock<PvCache>,
        db: &PvDatabase,
        cache_size: usize,
        upstream_count: usize,
    ) {
        if self.prefix.is_empty() {
            return;
        }

        // Compute counts by state via the single-pass count_states
        // helper (B-G13). Snapshot inside count_states releases the
        // per-entry Arc borrows once collected, so the outer
        // `cache.read().await` doesn't span the per-entry awaits.
        let cache_guard = cache.read().await;
        let (connecting, active, inactive, dead, _disconnect) = cache_guard.count_states().await;
        drop(cache_guard);

        // Compute event rate over the interval since last refresh
        let now = Instant::now();
        let mut last = self.last_refresh.lock().await;
        let elapsed = now.duration_since(*last).as_secs_f64();
        *last = now;
        drop(last);

        let total_events = self.total_events.load(Ordering::Relaxed);
        let last_events = self.last_total_events.swap(total_events, Ordering::Relaxed);
        let delta = total_events.saturating_sub(last_events);
        let event_rate = if elapsed > 0.0 {
            delta as f64 / elapsed
        } else {
            0.0
        };

        let put_count = self.put_count.load(Ordering::Relaxed);
        let readonly = self.read_only_rejects.load(Ordering::Relaxed);
        let heartbeat = self.heartbeat.load(Ordering::Relaxed);
        let host_count = self.host_count().await;

        // B5 RATE_STATS internals.
        let post_event_count = self.post_event_count.load(Ordering::Relaxed);
        let loop_count = self.loop_count.load(Ordering::Relaxed);
        // `clientEventCount` is the same upstream-event source as
        // `total_events` — the C++ gateway exposes both the rate PV
        // (`clientEventRate`) and the raw count (`clientEventCount`)
        // from one counter.
        let client_event_count = total_events;

        // Fan all 12 stats PV writes out concurrently. Each
        // `put_pv_and_post` is independent (no shared lock between them
        // beyond the per-PV `RwLock`), so a single `tokio::join!` cuts
        // refresh latency from `12 × put_latency` to `max(put_latency)`.
        let p = &self.prefix;
        // Bind names to locals so the futures inside `join!` borrow them
        // for long enough; bare `&format!(...)` would be dropped at the
        // end of the macro line.
        let n_total = format!("{p}totalPvs");
        let n_upstream = format!("{p}upstreamCount");
        let n_connecting = format!("{p}connectingCount");
        let n_active = format!("{p}activeCount");
        let n_inactive = format!("{p}inactiveCount");
        let n_dead = format!("{p}deadCount");
        let n_rate = format!("{p}eventRate");
        let n_events = format!("{p}totalEvents");
        let n_heartbeat = format!("{p}heartbeat");
        let n_put = format!("{p}putCount");
        let n_readonly = format!("{p}readOnlyRejects");
        let n_hosts = format!("{p}perHostConnections");
        // C++ ca-gateway aliases (B-G10).
        let n_vctotal = format!("{p}vctotal");
        let n_pvtotal = format!("{p}pvtotal");
        let n_connected = format!("{p}connected");
        let n_active_alias = format!("{p}active");
        let n_inactive_alias = format!("{p}inactive");
        let n_unconnected = format!("{p}unconnected");
        let n_dead_alias = format!("{p}dead");
        let n_connecting_alias = format!("{p}connecting");
        let n_disconnected = format!("{p}disconnected");
        let n_client_event_rate = format!("{p}clientEventRate");
        // B5 RATE_STATS PV names.
        let n_fd = format!("{p}fd");
        let n_client_event_count = format!("{p}clientEventCount");
        let n_post_event_count = format!("{p}postEventCount");
        let n_loop_count = format!("{p}loopCount");
        let connected = (active + inactive) as i32;
        let unconnected = (connecting + dead) as i32;
        // Sample the live open-fd count. `open_fd_count` reads a kernel
        // directory; on the rare platform where neither exists, keep
        // the PV at its previous value rather than posting a bogus 0.
        let fd_count = open_fd_count();
        let _ = tokio::join!(
            db.put_pv_and_post(&n_total, EpicsValue::Long(cache_size as i32)),
            db.put_pv_and_post(&n_upstream, EpicsValue::Long(upstream_count as i32)),
            db.put_pv_and_post(&n_connecting, EpicsValue::Long(connecting as i32)),
            db.put_pv_and_post(&n_active, EpicsValue::Long(active as i32)),
            db.put_pv_and_post(&n_inactive, EpicsValue::Long(inactive as i32)),
            db.put_pv_and_post(&n_dead, EpicsValue::Long(dead as i32)),
            db.put_pv_and_post(&n_rate, EpicsValue::Double(event_rate)),
            db.put_pv_and_post(&n_events, EpicsValue::Long(total_events as i32)),
            db.put_pv_and_post(&n_heartbeat, EpicsValue::Long(heartbeat as i32)),
            db.put_pv_and_post(&n_put, EpicsValue::Long(put_count as i32)),
            db.put_pv_and_post(&n_readonly, EpicsValue::Long(readonly as i32)),
            db.put_pv_and_post(&n_hosts, EpicsValue::Long(host_count as i32)),
            db.put_pv_and_post(&n_vctotal, EpicsValue::Long(cache_size as i32)),
            db.put_pv_and_post(&n_pvtotal, EpicsValue::Long(cache_size as i32)),
            db.put_pv_and_post(&n_connected, EpicsValue::Long(connected)),
            db.put_pv_and_post(&n_active_alias, EpicsValue::Long(active as i32)),
            db.put_pv_and_post(&n_inactive_alias, EpicsValue::Long(inactive as i32)),
            db.put_pv_and_post(&n_unconnected, EpicsValue::Long(unconnected)),
            db.put_pv_and_post(&n_dead_alias, EpicsValue::Long(dead as i32)),
            db.put_pv_and_post(&n_connecting_alias, EpicsValue::Long(connecting as i32)),
            db.put_pv_and_post(&n_disconnected, EpicsValue::Long(dead as i32)),
            db.put_pv_and_post(&n_client_event_rate, EpicsValue::Double(event_rate)),
            // B5 RATE_STATS internals.
            db.put_pv_and_post(
                &n_client_event_count,
                EpicsValue::Long(client_event_count as i32),
            ),
            db.put_pv_and_post(
                &n_post_event_count,
                EpicsValue::Long(post_event_count as i32),
            ),
            db.put_pv_and_post(&n_loop_count, EpicsValue::Long(loop_count as i32)),
        );

        // `fd` is posted separately because it is only available when
        // the kernel fd directory could be read; on an unsupported
        // platform we leave the PV at its last value rather than
        // posting a misleading 0.
        if let Some(fd) = fd_count {
            let _ = db.put_pv_and_post(&n_fd, EpicsValue::Long(fd as i32)).await;
        }
    }

    /// Increment the heartbeat counter and post to the heartbeat PV.
    pub async fn heartbeat_tick(&self, db: &PvDatabase) {
        let n = self.heartbeat.fetch_add(1, Ordering::Relaxed) + 1;
        if !self.prefix.is_empty() {
            let _ = db
                .put_pv_and_post(
                    &format!("{}heartbeat", self.prefix),
                    EpicsValue::Long(n as i32),
                )
                .await;
        }
    }

    pub fn prefix(&self) -> &str {
        &self.prefix
    }
}

/// B5: count the process's currently open file descriptors.
///
/// The C++ ca-gateway publishes `fd` from `fdManager`'s registered
/// descriptor table. The tokio port has no such table, so the count
/// is derived from the kernel's per-process fd directory:
///
/// - Linux: `/proc/self/fd`
/// - macOS / *BSD: `/dev/fd`
///
/// Both directories list one entry per open descriptor. The reader
/// handle that `read_dir` itself opens is subtracted so the reported
/// count reflects the steady-state fd usage rather than transiently
/// counting the enumeration handle. Returns `None` on platforms
/// where neither directory is present (the stat PV is then left at
/// its last value rather than reporting a misleading zero).
pub fn open_fd_count() -> Option<u64> {
    // `/proc/self/fd` is authoritative on Linux. `/dev/fd` works on
    // macOS and the BSDs (it is an fdescfs mount). Try the Linux path
    // first since on Linux `/dev/fd` is a symlink into procfs anyway.
    for dir in ["/proc/self/fd", "/dev/fd"] {
        match std::fs::read_dir(dir) {
            Ok(entries) => {
                let n = entries.filter(|e| e.is_ok()).count() as u64;
                // `read_dir` itself holds one descriptor open for the
                // duration of the iteration; it is included in the
                // listing on Linux/procfs. Subtract it so the count
                // is the steady-state value. Saturate at 0 in the
                // (impossible) case of an empty directory.
                return Some(n.saturating_sub(1));
            }
            Err(_) => continue,
        }
    }
    None
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn counters_increment() {
        let stats = Stats::new("g:".into());
        assert_eq!(stats.total_events.load(Ordering::Relaxed), 0);
        stats.record_event();
        stats.record_event();
        assert_eq!(stats.total_events.load(Ordering::Relaxed), 2);

        stats.record_put();
        assert_eq!(stats.put_count.load(Ordering::Relaxed), 1);

        stats.record_readonly_reject();
        assert_eq!(stats.read_only_rejects.load(Ordering::Relaxed), 1);
    }

    #[tokio::test]
    async fn host_tracking() {
        let stats = Stats::new("g:".into());
        assert_eq!(stats.host_count().await, 0);

        stats.record_host("host1").await;
        stats.record_host("host2").await;
        stats.record_host("host1").await; // duplicate
        assert_eq!(stats.host_count().await, 2);

        stats.forget_host("host1").await;
        assert_eq!(stats.host_count().await, 1);
    }

    #[tokio::test]
    async fn publish_initial_creates_pvs() {
        let stats = Stats::new("g:".into());
        let db = PvDatabase::new();
        stats.publish_initial(&db).await;

        assert!(db.has_name("g:totalPvs").await);
        assert!(db.has_name("g:heartbeat").await);
        assert!(db.has_name("g:eventRate").await);
    }

    #[tokio::test]
    async fn empty_prefix_skips_publish() {
        let stats = Stats::new("".into());
        let db = PvDatabase::new();
        stats.publish_initial(&db).await;
        assert!(!db.has_name("totalPvs").await);
    }

    // --- B5: fd / RATE_STATS counters ---

    #[test]
    fn rate_stats_counters_increment() {
        let stats = Stats::new("g:".into());
        assert_eq!(stats.post_event_count.load(Ordering::Relaxed), 0);
        assert_eq!(stats.loop_count.load(Ordering::Relaxed), 0);

        stats.record_post_event();
        stats.record_post_event();
        stats.record_post_event();
        assert_eq!(stats.post_event_count.load(Ordering::Relaxed), 3);

        stats.record_loop();
        assert_eq!(stats.loop_count.load(Ordering::Relaxed), 1);
    }

    #[test]
    fn open_fd_count_is_plausible() {
        // The test process always has at least stdin/stdout/stderr
        // open, so on any supported platform the count is non-zero.
        // On an unsupported platform `None` is acceptable.
        if let Some(n) = open_fd_count() {
            assert!(n >= 3, "expected at least 3 open fds, got {n}");
        }
    }

    #[test]
    fn open_fd_count_tracks_new_descriptors() {
        let before = match open_fd_count() {
            Some(n) => n,
            None => return, // unsupported platform — nothing to assert
        };
        // Open a batch of descriptors at once. A single fd would be
        // lost in the noise of a parallel test runner (other threads
        // open/close fds concurrently); a batch of 32 produces a
        // delta that comfortably exceeds that noise floor. The files
        // are held open in `_held` until the assertion runs.
        const BATCH: usize = 32;
        let dir = std::env::temp_dir();
        let mut _held = Vec::with_capacity(BATCH);
        let mut paths = Vec::with_capacity(BATCH);
        for i in 0..BATCH {
            let p = dir.join(format!("ca_gw_stats_fd_probe_{}_{i}", std::process::id()));
            _held.push(std::fs::File::create(&p).expect("create temp file"));
            paths.push(p);
        }
        let during = open_fd_count().expect("fd count available");
        // The count must have risen by at least half the batch even
        // if a few descriptors are transiently miscounted under
        // parallel test execution.
        assert!(
            during >= before + (BATCH as u64) / 2,
            "open fd count did not rise enough: before={before} during={during}"
        );
        drop(_held);
        for p in paths {
            let _ = std::fs::remove_file(p);
        }
    }

    #[tokio::test]
    async fn publish_initial_creates_rate_stats_pvs() {
        let stats = Stats::new("g:".into());
        let db = PvDatabase::new();
        stats.publish_initial(&db).await;

        assert!(db.has_name("g:fd").await);
        assert!(db.has_name("g:clientEventCount").await);
        assert!(db.has_name("g:postEventCount").await);
        assert!(db.has_name("g:loopCount").await);
    }

    #[tokio::test]
    async fn refresh_publishes_rate_stats() {
        let stats = Stats::new("g:".into());
        let db = PvDatabase::new();
        stats.publish_initial(&db).await;

        // Drive the counters, then refresh and confirm the PVs reflect
        // the new values.
        stats.record_event(); // clientEventCount source
        stats.record_event();
        stats.record_post_event();
        stats.record_loop();
        stats.record_loop();
        stats.record_loop();

        let cache = RwLock::new(PvCache::new());
        stats.refresh(&cache, &db, 0, 0).await;

        assert_eq!(
            db.get_pv("g:clientEventCount").await.unwrap(),
            EpicsValue::Long(2)
        );
        assert_eq!(
            db.get_pv("g:postEventCount").await.unwrap(),
            EpicsValue::Long(1)
        );
        assert_eq!(db.get_pv("g:loopCount").await.unwrap(), EpicsValue::Long(3));
        // `fd` should have been posted with a plausible value on any
        // supported platform.
        if let Ok(EpicsValue::Long(fd)) = db.get_pv("g:fd").await {
            assert!(fd >= 0);
        }
    }
}