Skip to main content

hashtree_network/
session.rs

1use anyhow::Result;
2use async_trait::async_trait;
3use std::sync::Arc;
4use std::time::Duration;
5
6use nostr_sdk::nostr::{Event, Filter};
7
8use super::bluetooth_peer::BluetoothPeer;
9use super::peer::Peer;
10use super::types::{MeshNostrFrame, PeerHTLConfig};
11use crate::mesh_session::MeshSession;
12use crate::runtime_peer::PeerTransport;
13
14#[derive(Clone)]
15pub enum MeshPeer {
16    WebRtc(Arc<Peer>),
17    Bluetooth(Arc<BluetoothPeer>),
18    #[cfg(test)]
19    Mock(Arc<TestMeshPeer>),
20}
21
22impl MeshPeer {
23    pub fn is_ready(&self) -> bool {
24        match self {
25            Self::WebRtc(peer) => peer.has_data_channel(),
26            Self::Bluetooth(peer) => peer.is_connected(),
27            #[cfg(test)]
28            Self::Mock(peer) => peer.ready,
29        }
30    }
31
32    pub fn is_connected(&self) -> bool {
33        match self {
34            Self::WebRtc(peer) => peer.is_connected(),
35            Self::Bluetooth(peer) => peer.is_connected(),
36            #[cfg(test)]
37            Self::Mock(peer) => peer.connected,
38        }
39    }
40
41    pub fn htl_config(&self) -> PeerHTLConfig {
42        match self {
43            Self::WebRtc(peer) => *peer.htl_config(),
44            Self::Bluetooth(peer) => *peer.htl_config(),
45            #[cfg(test)]
46            Self::Mock(peer) => peer.htl_config,
47        }
48    }
49
50    pub fn transport(&self) -> PeerTransport {
51        match self {
52            Self::WebRtc(_) => PeerTransport::WebRtc,
53            Self::Bluetooth(_) => PeerTransport::Bluetooth,
54            #[cfg(test)]
55            Self::Mock(_) => PeerTransport::WebRtc,
56        }
57    }
58
59    pub async fn request(&self, hash_hex: &str, timeout: Duration) -> Result<Option<Vec<u8>>> {
60        match self {
61            Self::WebRtc(peer) => peer.request_with_timeout(hash_hex, timeout).await,
62            Self::Bluetooth(peer) => peer.request_with_timeout(hash_hex, timeout).await,
63            #[cfg(test)]
64            Self::Mock(peer) => peer.request(hash_hex, timeout).await,
65        }
66    }
67
68    pub async fn query_nostr_events(
69        &self,
70        filters: Vec<Filter>,
71        timeout: Duration,
72    ) -> Result<Vec<Event>> {
73        match self {
74            Self::WebRtc(peer) => peer.query_nostr_events(filters, timeout).await,
75            Self::Bluetooth(peer) => peer.query_nostr_events(filters, timeout).await,
76            #[cfg(test)]
77            Self::Mock(peer) => peer.query_nostr_events(filters, timeout).await,
78        }
79    }
80
81    pub async fn send_mesh_frame_text(&self, frame: &MeshNostrFrame) -> Result<()> {
82        match self {
83            Self::WebRtc(peer) => peer.send_mesh_frame_text(frame).await,
84            Self::Bluetooth(peer) => peer.send_mesh_frame_text(frame).await,
85            #[cfg(test)]
86            Self::Mock(peer) => peer.send_mesh_frame_text(frame).await,
87        }
88    }
89
90    pub async fn close(&self) -> Result<()> {
91        match self {
92            Self::WebRtc(peer) => peer.close().await,
93            Self::Bluetooth(peer) => peer.close().await,
94            #[cfg(test)]
95            Self::Mock(peer) => peer.close().await,
96        }
97    }
98
99    pub fn as_webrtc(&self) -> Option<&Arc<Peer>> {
100        match self {
101            Self::WebRtc(peer) => Some(peer),
102            Self::Bluetooth(_) => None,
103            #[cfg(test)]
104            Self::Mock(_) => None,
105        }
106    }
107}
108
109#[async_trait]
110impl MeshSession for MeshPeer {
111    fn is_ready(&self) -> bool {
112        Self::is_ready(self)
113    }
114
115    fn is_connected(&self) -> bool {
116        Self::is_connected(self)
117    }
118
119    fn htl_config(&self) -> PeerHTLConfig {
120        Self::htl_config(self)
121    }
122
123    async fn request(&self, hash_hex: &str, timeout: Duration) -> Result<Option<Vec<u8>>> {
124        Self::request(self, hash_hex, timeout).await
125    }
126
127    async fn query_nostr_events(
128        &self,
129        filters: Vec<Filter>,
130        timeout: Duration,
131    ) -> Result<Vec<Event>> {
132        Self::query_nostr_events(self, filters, timeout).await
133    }
134
135    async fn send_mesh_frame_text(&self, frame: &MeshNostrFrame) -> Result<()> {
136        Self::send_mesh_frame_text(self, frame).await
137    }
138
139    async fn close(&self) -> Result<()> {
140        Self::close(self).await
141    }
142
143    fn transport_debug_state(&self) -> Option<String> {
144        match self {
145            Self::WebRtc(peer) => Some(format!("{:?}", peer.state())),
146            Self::Bluetooth(_) => None,
147            #[cfg(test)]
148            Self::Mock(_) => None,
149        }
150    }
151}
152
153#[cfg(test)]
154use anyhow::anyhow;
155
156#[cfg(test)]
157pub struct TestMeshPeer {
158    pub ready: bool,
159    pub connected: bool,
160    pub htl_config: PeerHTLConfig,
161    request_response: tokio::sync::Mutex<Option<Vec<u8>>>,
162    response_delay: Duration,
163    query_events: tokio::sync::Mutex<Vec<Event>>,
164    query_delay: Duration,
165    sent_frames: tokio::sync::Mutex<Vec<MeshNostrFrame>>,
166    close_delay: Duration,
167    closed: std::sync::atomic::AtomicBool,
168}
169
170#[cfg(test)]
171impl TestMeshPeer {
172    pub fn with_response(response: Option<Vec<u8>>) -> Self {
173        Self {
174            ready: true,
175            connected: true,
176            htl_config: PeerHTLConfig::from_flags(false, false),
177            request_response: tokio::sync::Mutex::new(response),
178            response_delay: Duration::ZERO,
179            query_events: tokio::sync::Mutex::new(Vec::new()),
180            query_delay: Duration::ZERO,
181            sent_frames: tokio::sync::Mutex::new(Vec::new()),
182            close_delay: Duration::ZERO,
183            closed: std::sync::atomic::AtomicBool::new(false),
184        }
185    }
186
187    pub fn with_delayed_response(response: Option<Vec<u8>>, response_delay: Duration) -> Self {
188        Self {
189            ready: true,
190            connected: true,
191            htl_config: PeerHTLConfig::from_flags(false, false),
192            request_response: tokio::sync::Mutex::new(response),
193            response_delay,
194            query_events: tokio::sync::Mutex::new(Vec::new()),
195            query_delay: Duration::ZERO,
196            sent_frames: tokio::sync::Mutex::new(Vec::new()),
197            close_delay: Duration::ZERO,
198            closed: std::sync::atomic::AtomicBool::new(false),
199        }
200    }
201
202    pub fn with_events(events: Vec<Event>) -> Self {
203        Self {
204            ready: true,
205            connected: true,
206            htl_config: PeerHTLConfig::from_flags(false, false),
207            request_response: tokio::sync::Mutex::new(None),
208            response_delay: Duration::ZERO,
209            query_events: tokio::sync::Mutex::new(events),
210            query_delay: Duration::ZERO,
211            sent_frames: tokio::sync::Mutex::new(Vec::new()),
212            close_delay: Duration::ZERO,
213            closed: std::sync::atomic::AtomicBool::new(false),
214        }
215    }
216
217    pub fn with_delayed_events(events: Vec<Event>, query_delay: Duration) -> Self {
218        Self {
219            ready: true,
220            connected: true,
221            htl_config: PeerHTLConfig::from_flags(false, false),
222            request_response: tokio::sync::Mutex::new(None),
223            response_delay: Duration::ZERO,
224            query_events: tokio::sync::Mutex::new(events),
225            query_delay,
226            sent_frames: tokio::sync::Mutex::new(Vec::new()),
227            close_delay: Duration::ZERO,
228            closed: std::sync::atomic::AtomicBool::new(false),
229        }
230    }
231
232    pub fn with_delayed_close(close_delay: Duration) -> Self {
233        Self {
234            ready: true,
235            connected: false,
236            htl_config: PeerHTLConfig::from_flags(false, false),
237            request_response: tokio::sync::Mutex::new(None),
238            response_delay: Duration::ZERO,
239            query_events: tokio::sync::Mutex::new(Vec::new()),
240            query_delay: Duration::ZERO,
241            sent_frames: tokio::sync::Mutex::new(Vec::new()),
242            close_delay,
243            closed: std::sync::atomic::AtomicBool::new(false),
244        }
245    }
246
247    pub async fn request(&self, _hash_hex: &str, _timeout: Duration) -> Result<Option<Vec<u8>>> {
248        if !self.response_delay.is_zero() {
249            tokio::time::sleep(self.response_delay).await;
250        }
251        Ok(self.request_response.lock().await.clone())
252    }
253
254    pub async fn query_nostr_events(
255        &self,
256        _filters: Vec<Filter>,
257        _timeout: Duration,
258    ) -> Result<Vec<Event>> {
259        if !self.query_delay.is_zero() {
260            tokio::time::sleep(self.query_delay).await;
261        }
262        Ok(self.query_events.lock().await.clone())
263    }
264
265    pub async fn send_mesh_frame_text(&self, frame: &MeshNostrFrame) -> Result<()> {
266        self.sent_frames.lock().await.push(frame.clone());
267        Ok(())
268    }
269
270    pub async fn close(&self) -> Result<()> {
271        if !self.close_delay.is_zero() {
272            tokio::time::sleep(self.close_delay).await;
273        }
274        self.closed
275            .store(true, std::sync::atomic::Ordering::Relaxed);
276        Ok(())
277    }
278
279    pub async fn sent_frame_count(&self) -> usize {
280        self.sent_frames.lock().await.len()
281    }
282
283    pub fn is_closed(&self) -> bool {
284        self.closed.load(std::sync::atomic::Ordering::Relaxed)
285    }
286}
287
288#[cfg(test)]
289impl MeshPeer {
290    pub fn mock_for_tests(peer: TestMeshPeer) -> Self {
291        Self::Mock(Arc::new(peer))
292    }
293
294    pub fn mock_ref(&self) -> Result<&Arc<TestMeshPeer>> {
295        match self {
296            Self::Mock(peer) => Ok(peer),
297            _ => Err(anyhow!("mesh peer is not a mock")),
298        }
299    }
300}