Skip to main content

opcua_server/
reverse_connect.rs

1use std::{
2    net::SocketAddr,
3    sync::Arc,
4    time::{Duration, Instant},
5};
6
7use hashbrown::HashMap;
8use opcua_core::{
9    sync::{Mutex, RwLock},
10    trace_lock, trace_read_lock, trace_write_lock,
11};
12use opcua_types::StatusCode;
13use tokio::sync::Notify;
14
15#[derive(Clone)]
16/// Configuration for a reverse connect target.
17pub struct ReverseConnectTargetConfig {
18    /// The client address the server should connect to.
19    pub address: SocketAddr,
20    /// The endpoint URL in the reverse hello message, informing the client
21    /// of which endpoint this is.
22    pub endpoint_url: String,
23    /// Unique ID for this reverse connect target.
24    pub id: String,
25}
26
27enum ReverseConnectStateType {
28    /// Failed at time, will retry at time.
29    Failed(Instant),
30    /// Successfully connected at time.
31    Connected,
32    /// Currently trying to connect.
33    Connecting,
34    /// Not currently trying to connect, waiting to connect.
35    Waiting,
36}
37
38struct ReverseConnectTarget {
39    config: ReverseConnectTargetConfig,
40    state: Arc<Mutex<ReverseConnectState>>,
41}
42
43struct ReverseConnectState {
44    state: ReverseConnectStateType,
45}
46
47pub(crate) struct ReverseConnectionInstanceHandle {
48    state: Arc<Mutex<ReverseConnectState>>,
49    notify: Arc<Notify>,
50}
51
52impl ReverseConnectionInstanceHandle {
53    fn new(state: Arc<Mutex<ReverseConnectState>>, notify: Arc<Notify>) -> Self {
54        trace_lock!(state).state = ReverseConnectStateType::Connecting;
55        Self { state, notify }
56    }
57
58    pub(crate) fn set_result(&self, status: StatusCode) {
59        let mut state = trace_lock!(self.state);
60        state.state = if status.is_good() {
61            ReverseConnectStateType::Connected
62        } else {
63            ReverseConnectStateType::Failed(Instant::now())
64        };
65        self.notify.notify_waiters();
66    }
67}
68
69impl Drop for ReverseConnectionInstanceHandle {
70    fn drop(&mut self) {
71        let mut state = trace_lock!(self.state);
72        // Once the handle is dropped we're no longer connected.
73        state.state = ReverseConnectStateType::Waiting;
74        self.notify.notify_waiters();
75    }
76}
77
78pub(crate) struct ReverseConnectionManager {
79    active_targets: Arc<RwLock<HashMap<String, ReverseConnectTarget>>>,
80    notify: Arc<tokio::sync::Notify>,
81    failure_retry: Duration,
82}
83
84#[derive(Clone)]
85pub(crate) struct ReverseConnectHandle {
86    active_targets: Arc<RwLock<HashMap<String, ReverseConnectTarget>>>,
87    notify: Arc<tokio::sync::Notify>,
88}
89
90impl ReverseConnectHandle {
91    pub(crate) fn add_target(&self, target: ReverseConnectTargetConfig) {
92        let mut targets = trace_write_lock!(self.active_targets);
93        targets
94            .entry(target.id.clone())
95            .or_insert_with(|| ReverseConnectTarget {
96                config: target,
97                state: Arc::new(Mutex::new(ReverseConnectState {
98                    state: ReverseConnectStateType::Waiting,
99                })),
100            });
101        self.notify.notify_waiters();
102    }
103
104    pub(crate) fn remove_target(&self, id: &str) {
105        let mut targets = trace_write_lock!(self.active_targets);
106        targets.remove(id);
107    }
108}
109
110pub(crate) struct PendingReverseConnection {
111    pub target: ReverseConnectTargetConfig,
112    pub handle: ReverseConnectionInstanceHandle,
113}
114
115impl PendingReverseConnection {
116    fn new(target: ReverseConnectTargetConfig, handle: ReverseConnectionInstanceHandle) -> Self {
117        Self { target, handle }
118    }
119}
120
121impl ReverseConnectionManager {
122    pub(crate) fn new(failure_retry: Duration) -> (Self, ReverseConnectHandle) {
123        let active_targets = Arc::new(RwLock::new(HashMap::new()));
124        let notify = Arc::new(tokio::sync::Notify::new());
125        (
126            Self {
127                active_targets: active_targets.clone(),
128                notify: notify.clone(),
129                failure_retry,
130            },
131            ReverseConnectHandle {
132                active_targets,
133                notify,
134            },
135        )
136    }
137
138    pub(crate) async fn wait_for_connection(&self) -> PendingReverseConnection {
139        loop {
140            let mut next_wait_for = None;
141            let notified = self.notify.notified();
142            {
143                let targets = trace_read_lock!(self.active_targets);
144                for target in targets.values() {
145                    {
146                        let state = trace_lock!(target.state);
147                        // Check if we should connect, and store the next time we should wake up if we have any rejected connections.
148                        match &state.state {
149                            ReverseConnectStateType::Failed(time) => {
150                                let next_time = *time + self.failure_retry;
151                                if Instant::now() < next_time {
152                                    match next_wait_for {
153                                        Some(next) if next < next_time => {}
154                                        _ => {
155                                            next_wait_for = Some(next_time);
156                                        }
157                                    }
158                                    continue;
159                                }
160                            }
161                            ReverseConnectStateType::Connecting
162                            | ReverseConnectStateType::Connected => {
163                                continue;
164                            }
165                            ReverseConnectStateType::Waiting => {}
166                        }
167                    }
168                    return PendingReverseConnection::new(
169                        target.config.clone(),
170                        ReverseConnectionInstanceHandle::new(
171                            target.state.clone(),
172                            self.notify.clone(),
173                        ),
174                    );
175                }
176            }
177
178            let next_fut = match next_wait_for {
179                Some(time) => futures::future::Either::Left(tokio::time::sleep_until(time.into())),
180                None => futures::future::Either::Right(futures::future::pending()),
181            };
182            tokio::select! {
183                _ = notified => {}
184                _ = next_fut => {}
185            }
186        }
187    }
188}