opcua_server/
reverse_connect.rs1use std::{
2 net::SocketAddr,
3 sync::Arc,
4 time::{Duration, Instant},
5};
6
7use hashbrown::HashMap;
8use opcua_core::{
9 sync::{Mutex, RwLock},
10 trace_lock, trace_read_lock, trace_write_lock,
11};
12use opcua_types::StatusCode;
13use tokio::sync::Notify;
14
15#[derive(Clone)]
16pub struct ReverseConnectTargetConfig {
18 pub address: SocketAddr,
20 pub endpoint_url: String,
23 pub id: String,
25}
26
27enum ReverseConnectStateType {
28 Failed(Instant),
30 Connected,
32 Connecting,
34 Waiting,
36}
37
38struct ReverseConnectTarget {
39 config: ReverseConnectTargetConfig,
40 state: Arc<Mutex<ReverseConnectState>>,
41}
42
43struct ReverseConnectState {
44 state: ReverseConnectStateType,
45}
46
47pub(crate) struct ReverseConnectionInstanceHandle {
48 state: Arc<Mutex<ReverseConnectState>>,
49 notify: Arc<Notify>,
50}
51
52impl ReverseConnectionInstanceHandle {
53 fn new(state: Arc<Mutex<ReverseConnectState>>, notify: Arc<Notify>) -> Self {
54 trace_lock!(state).state = ReverseConnectStateType::Connecting;
55 Self { state, notify }
56 }
57
58 pub(crate) fn set_result(&self, status: StatusCode) {
59 let mut state = trace_lock!(self.state);
60 state.state = if status.is_good() {
61 ReverseConnectStateType::Connected
62 } else {
63 ReverseConnectStateType::Failed(Instant::now())
64 };
65 self.notify.notify_waiters();
66 }
67}
68
69impl Drop for ReverseConnectionInstanceHandle {
70 fn drop(&mut self) {
71 let mut state = trace_lock!(self.state);
72 state.state = ReverseConnectStateType::Waiting;
74 self.notify.notify_waiters();
75 }
76}
77
78pub(crate) struct ReverseConnectionManager {
79 active_targets: Arc<RwLock<HashMap<String, ReverseConnectTarget>>>,
80 notify: Arc<tokio::sync::Notify>,
81 failure_retry: Duration,
82}
83
84#[derive(Clone)]
85pub(crate) struct ReverseConnectHandle {
86 active_targets: Arc<RwLock<HashMap<String, ReverseConnectTarget>>>,
87 notify: Arc<tokio::sync::Notify>,
88}
89
90impl ReverseConnectHandle {
91 pub(crate) fn add_target(&self, target: ReverseConnectTargetConfig) {
92 let mut targets = trace_write_lock!(self.active_targets);
93 targets
94 .entry(target.id.clone())
95 .or_insert_with(|| ReverseConnectTarget {
96 config: target,
97 state: Arc::new(Mutex::new(ReverseConnectState {
98 state: ReverseConnectStateType::Waiting,
99 })),
100 });
101 self.notify.notify_waiters();
102 }
103
104 pub(crate) fn remove_target(&self, id: &str) {
105 let mut targets = trace_write_lock!(self.active_targets);
106 targets.remove(id);
107 }
108}
109
110pub(crate) struct PendingReverseConnection {
111 pub target: ReverseConnectTargetConfig,
112 pub handle: ReverseConnectionInstanceHandle,
113}
114
115impl PendingReverseConnection {
116 fn new(target: ReverseConnectTargetConfig, handle: ReverseConnectionInstanceHandle) -> Self {
117 Self { target, handle }
118 }
119}
120
121impl ReverseConnectionManager {
122 pub(crate) fn new(failure_retry: Duration) -> (Self, ReverseConnectHandle) {
123 let active_targets = Arc::new(RwLock::new(HashMap::new()));
124 let notify = Arc::new(tokio::sync::Notify::new());
125 (
126 Self {
127 active_targets: active_targets.clone(),
128 notify: notify.clone(),
129 failure_retry,
130 },
131 ReverseConnectHandle {
132 active_targets,
133 notify,
134 },
135 )
136 }
137
138 pub(crate) async fn wait_for_connection(&self) -> PendingReverseConnection {
139 loop {
140 let mut next_wait_for = None;
141 let notified = self.notify.notified();
142 {
143 let targets = trace_read_lock!(self.active_targets);
144 for target in targets.values() {
145 {
146 let state = trace_lock!(target.state);
147 match &state.state {
149 ReverseConnectStateType::Failed(time) => {
150 let next_time = *time + self.failure_retry;
151 if Instant::now() < next_time {
152 match next_wait_for {
153 Some(next) if next < next_time => {}
154 _ => {
155 next_wait_for = Some(next_time);
156 }
157 }
158 continue;
159 }
160 }
161 ReverseConnectStateType::Connecting
162 | ReverseConnectStateType::Connected => {
163 continue;
164 }
165 ReverseConnectStateType::Waiting => {}
166 }
167 }
168 return PendingReverseConnection::new(
169 target.config.clone(),
170 ReverseConnectionInstanceHandle::new(
171 target.state.clone(),
172 self.notify.clone(),
173 ),
174 );
175 }
176 }
177
178 let next_fut = match next_wait_for {
179 Some(time) => futures::future::Either::Left(tokio::time::sleep_until(time.into())),
180 None => futures::future::Either::Right(futures::future::pending()),
181 };
182 tokio::select! {
183 _ = notified => {}
184 _ = next_fut => {}
185 }
186 }
187 }
188}