1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
//! Process-wide registry of open [`PvaLink`]s, keyed by PV name + direction.
//!
//! Used by record handlers so multiple records pointing at the same PV
//! share a single underlying client connection.
use std::collections::HashMap;
use std::sync::Arc;
use parking_lot::RwLock;
use tokio::sync::Notify;
use super::config::{LinkDirection, PvaLinkConfig};
use super::link::{PvaLink, PvaLinkResult};
type RegistryKey = (String, LinkDirection);
/// Cached PvaLink. Returns the same `Arc<PvaLink>` for repeated `(pv, direction)` pairs.
#[derive(Default)]
pub struct PvaLinkRegistry {
map: RwLock<HashMap<RegistryKey, Arc<PvaLink>>>,
/// Round-36 (R36-G1): in-flight open dedup. The original
/// `get_or_open` used a textbook DCL — read-lock → open →
/// write-lock → DCL drop loser. Two concurrent first-callers
/// both reached `PvaLink::open` (which spawns a monitor task
/// and a `PvaClient`); the loser's resources cleaned up via
/// Drop, but two upstream search/connect round-trips and two
/// monitor-task spawns were spent for one user-visible result.
/// This map carries an `Arc<Notify>` per in-flight open; the
/// second caller awaits and then reads the cached entry.
pending: RwLock<HashMap<RegistryKey, Arc<Notify>>>,
}
impl PvaLinkRegistry {
pub fn new() -> Self {
Self::default()
}
/// Synchronous lookup of an already-open link. Returns `None`
/// if no link with the given `(pv_name, direction)` has been
/// opened yet. Used by the record-link hot path to skip the
/// async runtime when the link is already cached.
pub fn try_get(&self, pv_name: &str, direction: LinkDirection) -> Option<Arc<PvaLink>> {
self.map
.read()
.get(&(pv_name.to_string(), direction))
.cloned()
}
/// Get an existing link or open a new one. Concurrent calls
/// for the same key share one [`PvaLink::open`] invocation;
/// the second caller awaits via `pending` and reads the
/// winner's cached entry.
pub async fn get_or_open(&self, config: PvaLinkConfig) -> PvaLinkResult<Arc<PvaLink>> {
let key: RegistryKey = (config.pv_name.clone(), config.direction);
// Fast path: already cached.
if let Some(existing) = self.map.read().get(&key).cloned() {
return Ok(existing);
}
// In-flight dedup. Either we claim the slot and open, or we
// grab a Notify and await another task's completion.
//
// R49-G3: re-check the cache UNDER the `pending.write()`
// lock. The fast path (line 56-58) reads `self.map` without
// any synchronization vs. the winner's publish-then-clear
// sequence: a late caller could see a fast-path miss, then
// by the time it reaches `pending.write()` the winner has
// already published to the map AND removed the pending slot.
// Without this re-check the late caller would find pending
// empty, claim the slot itself, and run `PvaLink::open` a
// second time — defeating the singleflight invariant. The
// map read is held inside the same critical section as the
// pending lookup, so the only path that can publish to the
// map between the two checks (winner publish at line ~155
// BEFORE clearing pending) is now observable.
let (claim, notify) = {
let mut pending = self.pending.write();
if let Some(existing) = self.map.read().get(&key).cloned() {
return Ok(existing);
}
if let Some(existing) = pending.get(&key).cloned() {
(false, existing)
} else {
let n = Arc::new(Notify::new());
pending.insert(key.clone(), n.clone());
(true, n)
}
};
if !claim {
// Loser path: wait for the winner to finish, then read
// the cached entry.
//
// R48-G1: must register as a waiter BEFORE re-checking
// the map. Tokio `Notify::notify_waiters()` does not
// buffer permits, so the winner's guard drop can wake
// every currently-registered waiter and then leave a
// late `.notified().await` first-poll permanently
// pending. The fix mirrors the
// `enable()`-then-recheck-then-await pattern in
// `epics-pva-rs/src/server_native/tcp.rs` (and the
// tokio Notify docs): pin the future, call
// `enable()` to register the waker synchronously, then
// re-read the map. A wake that fires between the pin
// and the await is now safely observed.
let notified = notify.notified();
tokio::pin!(notified);
notified.as_mut().enable();
if let Some(existing) = self.map.read().get(&key).cloned() {
return Ok(existing);
}
// Winner may have already finished + cleared the
// pending slot; in that case the map should hold the
// entry (just re-checked above). If we get here the
// winner errored and the entry is absent — recurse.
if !self.pending.read().contains_key(&key) {
return Box::pin(self.get_or_open(config)).await;
}
notified.await;
if let Some(existing) = self.map.read().get(&key).cloned() {
return Ok(existing);
}
// Winner errored — recurse so this caller now becomes
// the claimant.
return Box::pin(self.get_or_open(config)).await;
}
// Winner path: run the open with a drop-guard that always
// clears the pending slot and wakes waiters — even on
// panic / cancellation / error.
struct CompletionGuard<'a> {
owner: &'a PvaLinkRegistry,
key: RegistryKey,
notify: Arc<Notify>,
armed: bool,
}
impl<'a> CompletionGuard<'a> {
fn disarm(&mut self) {
self.armed = false;
}
}
impl<'a> Drop for CompletionGuard<'a> {
fn drop(&mut self) {
self.owner.pending.write().remove(&self.key);
self.notify.notify_waiters();
let _ = self.armed; // suppress unused warning
}
}
let mut guard = CompletionGuard {
owner: self,
key: key.clone(),
notify,
armed: true,
};
let result = PvaLink::open(config).await;
let link = Arc::new(result?);
// Publish to the cache before releasing the pending slot so
// waiters that wake up see the cached entry immediately.
self.map.write().insert(key.clone(), link.clone());
guard.disarm();
// Manually run cleanup now (guard's Drop also clears, but
// we want it deterministic on the success path).
Ok(link)
}
pub fn close_all(&self) {
self.map.write().clear();
self.pending.write().clear();
}
pub fn len(&self) -> usize {
self.map.read().len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn close_all_empties_registry() {
let reg = PvaLinkRegistry::new();
// Don't actually open links (would require a running PVA server);
// just exercise the empty-state APIs.
assert!(reg.is_empty());
reg.close_all();
assert_eq!(reg.len(), 0);
}
}