1use std::collections::HashMap;
2use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
3use std::sync::Arc;
4use std::time::Duration;
5
6use tokio::sync::RwLock;
7
8use crate::local_bus::SharedLocalNostrBus;
9use crate::mesh_session::{resolve_root_from_local_buses_with_source, MeshSession};
10use crate::root_events::PeerRootEvent;
11use crate::runtime_peer::MeshPeerEntry;
12
13pub struct MeshRuntimeState<P> {
15 pub peers: Arc<RwLock<HashMap<String, MeshPeerEntry<P>>>>,
16 pub connected_count: Arc<AtomicUsize>,
17 pub bytes_sent: AtomicU64,
18 pub bytes_received: AtomicU64,
19 pub mesh_received: AtomicU64,
20 pub mesh_forwarded: AtomicU64,
21 pub mesh_dropped_duplicate: AtomicU64,
22 local_buses: RwLock<Vec<SharedLocalNostrBus>>,
23}
24
25impl<P> Default for MeshRuntimeState<P>
26where
27 P: MeshSession + Send + Sync + 'static,
28{
29 fn default() -> Self {
30 Self::new()
31 }
32}
33
34impl<P> MeshRuntimeState<P>
35where
36 P: MeshSession + Send + Sync + 'static,
37{
38 pub fn new() -> Self {
39 Self {
40 peers: Arc::new(RwLock::new(HashMap::new())),
41 connected_count: Arc::new(AtomicUsize::new(0)),
42 bytes_sent: AtomicU64::new(0),
43 bytes_received: AtomicU64::new(0),
44 mesh_received: AtomicU64::new(0),
45 mesh_forwarded: AtomicU64::new(0),
46 mesh_dropped_duplicate: AtomicU64::new(0),
47 local_buses: RwLock::new(Vec::new()),
48 }
49 }
50
51 pub async fn set_local_buses(&self, buses: Vec<SharedLocalNostrBus>) {
52 *self.local_buses.write().await = buses;
53 }
54
55 pub async fn add_local_bus(&self, bus: SharedLocalNostrBus) {
56 self.local_buses.write().await.push(bus);
57 }
58
59 pub async fn local_buses(&self) -> Vec<SharedLocalNostrBus> {
60 self.local_buses.read().await.clone()
61 }
62
63 pub async fn reset(&self) {
64 self.set_local_buses(Vec::new()).await;
65 let peers = {
66 let mut peers = self.peers.write().await;
67 std::mem::take(&mut *peers)
68 };
69 self.connected_count.store(0, Ordering::Relaxed);
70 for entry in peers.into_values() {
71 if let Some(peer) = entry.peer {
72 let _ = peer.close().await;
73 }
74 }
75 }
76
77 pub fn get_bandwidth(&self) -> (u64, u64) {
78 (
79 self.bytes_sent.load(Ordering::Relaxed),
80 self.bytes_received.load(Ordering::Relaxed),
81 )
82 }
83
84 pub fn get_mesh_stats(&self) -> (u64, u64, u64) {
85 (
86 self.mesh_received.load(Ordering::Relaxed),
87 self.mesh_forwarded.load(Ordering::Relaxed),
88 self.mesh_dropped_duplicate.load(Ordering::Relaxed),
89 )
90 }
91
92 pub fn record_mesh_received(&self) {
93 self.mesh_received.fetch_add(1, Ordering::Relaxed);
94 }
95
96 pub fn record_mesh_forwarded(&self, count: u64) {
97 self.mesh_forwarded.fetch_add(count, Ordering::Relaxed);
98 }
99
100 pub fn record_mesh_duplicate_drop(&self) {
101 self.mesh_dropped_duplicate.fetch_add(1, Ordering::Relaxed);
102 }
103
104 pub async fn record_sent(&self, peer_id: &str, bytes: u64) {
105 self.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
106 if let Some(entry) = self.peers.write().await.get_mut(peer_id) {
107 entry.bytes_sent += bytes;
108 }
109 }
110
111 pub async fn record_received(&self, peer_id: &str, bytes: u64) {
112 self.bytes_received.fetch_add(bytes, Ordering::Relaxed);
113 if let Some(entry) = self.peers.write().await.get_mut(peer_id) {
114 entry.bytes_received += bytes;
115 }
116 }
117
118 pub async fn resolve_root_from_local_buses_with_source(
119 &self,
120 owner_pubkey: &str,
121 tree_name: &str,
122 timeout: Duration,
123 ) -> Option<(&'static str, PeerRootEvent)> {
124 resolve_root_from_local_buses_with_source(
125 self.local_buses().await,
126 owner_pubkey,
127 tree_name,
128 timeout,
129 )
130 .await
131 }
132
133 pub async fn resolve_root_from_local_buses(
134 &self,
135 owner_pubkey: &str,
136 tree_name: &str,
137 timeout: Duration,
138 ) -> Option<PeerRootEvent> {
139 self.resolve_root_from_local_buses_with_source(owner_pubkey, tree_name, timeout)
140 .await
141 .map(|(_, root)| root)
142 }
143}
144
145#[cfg(test)]
146mod tests {
147 use super::*;
148 use anyhow::Result;
149 use async_trait::async_trait;
150 use nostr_sdk::nostr::{Event, Filter};
151 use std::collections::BTreeSet;
152 use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
153 use std::time::Instant;
154
155 use crate::local_bus::LocalNostrBus;
156 use crate::runtime_peer::{
157 ConnectionState, MeshPeerEntry, PeerDirection, PeerSignalPath, PeerTransport,
158 };
159 use crate::types::{MeshNostrFrame, PeerHTLConfig, PeerId, PeerPool};
160
161 struct TestSession {
162 closed: AtomicBool,
163 }
164
165 #[async_trait]
166 impl MeshSession for TestSession {
167 fn is_ready(&self) -> bool {
168 true
169 }
170
171 fn is_connected(&self) -> bool {
172 true
173 }
174
175 fn htl_config(&self) -> PeerHTLConfig {
176 PeerHTLConfig::from_flags(false, false)
177 }
178
179 async fn request(&self, _hash_hex: &str, _timeout: Duration) -> Result<Option<Vec<u8>>> {
180 Ok(None)
181 }
182
183 async fn query_nostr_events(
184 &self,
185 _filters: Vec<Filter>,
186 _timeout: Duration,
187 ) -> Result<Vec<Event>> {
188 Ok(Vec::new())
189 }
190
191 async fn send_mesh_frame_text(&self, _frame: &MeshNostrFrame) -> Result<()> {
192 Ok(())
193 }
194
195 async fn close(&self) -> Result<()> {
196 self.closed.store(true, AtomicOrdering::Relaxed);
197 Ok(())
198 }
199 }
200
201 struct TestLocalBus {
202 source: &'static str,
203 root: Option<PeerRootEvent>,
204 }
205
206 #[async_trait]
207 impl LocalNostrBus for TestLocalBus {
208 fn source_name(&self) -> &'static str {
209 self.source
210 }
211
212 async fn broadcast_event(&self, _event: &Event) -> Result<()> {
213 Ok(())
214 }
215
216 async fn query_root(
217 &self,
218 _owner_pubkey: &str,
219 _tree_name: &str,
220 _timeout: Duration,
221 ) -> Option<PeerRootEvent> {
222 self.root.clone()
223 }
224 }
225
226 #[tokio::test]
227 async fn record_updates_global_and_per_peer_counters() {
228 let runtime = MeshRuntimeState::<TestSession>::new();
229 let peer_id = PeerId::new("peer-a".to_string());
230 let peer_key = peer_id.to_string();
231 runtime.peers.write().await.insert(
232 peer_key.clone(),
233 MeshPeerEntry {
234 peer_id,
235 direction: PeerDirection::Outbound,
236 state: ConnectionState::Connected,
237 last_seen: Instant::now(),
238 peer: None,
239 pool: PeerPool::Other,
240 transport: PeerTransport::WebRtc,
241 signal_paths: BTreeSet::from([PeerSignalPath::Relay]),
242 bytes_sent: 0,
243 bytes_received: 0,
244 },
245 );
246
247 runtime.record_sent(&peer_key, 16).await;
248 runtime.record_received(&peer_key, 32).await;
249
250 assert_eq!(runtime.get_bandwidth(), (16, 32));
251 let peers = runtime.peers.read().await;
252 let entry = peers.get(&peer_key).expect("peer");
253 assert_eq!(entry.bytes_sent, 16);
254 assert_eq!(entry.bytes_received, 32);
255 }
256
257 #[tokio::test]
258 async fn reset_closes_peers_and_clears_local_buses() {
259 let runtime = MeshRuntimeState::<TestSession>::new();
260 let session = TestSession {
261 closed: AtomicBool::new(false),
262 };
263 let peer_id = PeerId::new("peer-a".to_string());
264 runtime.peers.write().await.insert(
265 peer_id.to_string(),
266 MeshPeerEntry {
267 peer_id,
268 direction: PeerDirection::Outbound,
269 state: ConnectionState::Connected,
270 last_seen: Instant::now(),
271 peer: Some(session),
272 pool: PeerPool::Other,
273 transport: PeerTransport::Bluetooth,
274 signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
275 bytes_sent: 0,
276 bytes_received: 0,
277 },
278 );
279 runtime.connected_count.store(1, Ordering::Relaxed);
280 runtime
281 .set_local_buses(vec![Arc::new(TestLocalBus {
282 source: "mock",
283 root: None,
284 }) as SharedLocalNostrBus])
285 .await;
286
287 runtime.reset().await;
288
289 assert_eq!(runtime.connected_count.load(Ordering::Relaxed), 0);
290 assert!(runtime.peers.read().await.is_empty());
291 assert!(runtime.local_buses().await.is_empty());
292 }
293
294 #[tokio::test]
295 async fn resolve_root_from_local_buses_returns_first_match() {
296 let runtime = MeshRuntimeState::<TestSession>::new();
297 let root = PeerRootEvent {
298 hash: "ab".repeat(32),
299 key: None,
300 encrypted_key: None,
301 self_encrypted_key: None,
302 event_id: "event-1".to_string(),
303 created_at: 1,
304 peer_id: "bus-peer".to_string(),
305 };
306 runtime
307 .set_local_buses(vec![
308 Arc::new(TestLocalBus {
309 source: "empty",
310 root: None,
311 }) as SharedLocalNostrBus,
312 Arc::new(TestLocalBus {
313 source: "mock-bus",
314 root: Some(root.clone()),
315 }) as SharedLocalNostrBus,
316 ])
317 .await;
318
319 let resolved = runtime
320 .resolve_root_from_local_buses_with_source("owner", "tree", Duration::from_millis(10))
321 .await
322 .expect("root");
323
324 assert_eq!(resolved.0, "mock-bus");
325 assert_eq!(resolved.1, root);
326 }
327}