Skip to main content

hashtree_network/
runtime_state.rs

1use std::collections::HashMap;
2use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
3use std::sync::Arc;
4use std::time::Duration;
5
6use tokio::sync::RwLock;
7
8use crate::local_bus::SharedLocalNostrBus;
9use crate::mesh_session::{resolve_root_from_local_buses_with_source, MeshSession};
10use crate::root_events::PeerRootEvent;
11use crate::runtime_peer::MeshPeerEntry;
12
13/// Shared runtime state for transport-backed mesh peers.
14pub struct MeshRuntimeState<P> {
15    pub peers: Arc<RwLock<HashMap<String, MeshPeerEntry<P>>>>,
16    pub connected_count: Arc<AtomicUsize>,
17    pub bytes_sent: AtomicU64,
18    pub bytes_received: AtomicU64,
19    pub mesh_received: AtomicU64,
20    pub mesh_forwarded: AtomicU64,
21    pub mesh_dropped_duplicate: AtomicU64,
22    local_buses: RwLock<Vec<SharedLocalNostrBus>>,
23}
24
25impl<P> Default for MeshRuntimeState<P>
26where
27    P: MeshSession + Send + Sync + 'static,
28{
29    fn default() -> Self {
30        Self::new()
31    }
32}
33
34impl<P> MeshRuntimeState<P>
35where
36    P: MeshSession + Send + Sync + 'static,
37{
38    pub fn new() -> Self {
39        Self {
40            peers: Arc::new(RwLock::new(HashMap::new())),
41            connected_count: Arc::new(AtomicUsize::new(0)),
42            bytes_sent: AtomicU64::new(0),
43            bytes_received: AtomicU64::new(0),
44            mesh_received: AtomicU64::new(0),
45            mesh_forwarded: AtomicU64::new(0),
46            mesh_dropped_duplicate: AtomicU64::new(0),
47            local_buses: RwLock::new(Vec::new()),
48        }
49    }
50
51    pub async fn set_local_buses(&self, buses: Vec<SharedLocalNostrBus>) {
52        *self.local_buses.write().await = buses;
53    }
54
55    pub async fn add_local_bus(&self, bus: SharedLocalNostrBus) {
56        self.local_buses.write().await.push(bus);
57    }
58
59    pub async fn local_buses(&self) -> Vec<SharedLocalNostrBus> {
60        self.local_buses.read().await.clone()
61    }
62
63    pub async fn reset(&self) {
64        self.set_local_buses(Vec::new()).await;
65        let peers = {
66            let mut peers = self.peers.write().await;
67            std::mem::take(&mut *peers)
68        };
69        self.connected_count.store(0, Ordering::Relaxed);
70        for entry in peers.into_values() {
71            if let Some(peer) = entry.peer {
72                let _ = peer.close().await;
73            }
74        }
75    }
76
77    pub fn get_bandwidth(&self) -> (u64, u64) {
78        (
79            self.bytes_sent.load(Ordering::Relaxed),
80            self.bytes_received.load(Ordering::Relaxed),
81        )
82    }
83
84    pub fn get_mesh_stats(&self) -> (u64, u64, u64) {
85        (
86            self.mesh_received.load(Ordering::Relaxed),
87            self.mesh_forwarded.load(Ordering::Relaxed),
88            self.mesh_dropped_duplicate.load(Ordering::Relaxed),
89        )
90    }
91
92    pub fn record_mesh_received(&self) {
93        self.mesh_received.fetch_add(1, Ordering::Relaxed);
94    }
95
96    pub fn record_mesh_forwarded(&self, count: u64) {
97        self.mesh_forwarded.fetch_add(count, Ordering::Relaxed);
98    }
99
100    pub fn record_mesh_duplicate_drop(&self) {
101        self.mesh_dropped_duplicate.fetch_add(1, Ordering::Relaxed);
102    }
103
104    pub async fn record_sent(&self, peer_id: &str, bytes: u64) {
105        self.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
106        if let Some(entry) = self.peers.write().await.get_mut(peer_id) {
107            entry.bytes_sent += bytes;
108        }
109    }
110
111    pub async fn record_received(&self, peer_id: &str, bytes: u64) {
112        self.bytes_received.fetch_add(bytes, Ordering::Relaxed);
113        if let Some(entry) = self.peers.write().await.get_mut(peer_id) {
114            entry.bytes_received += bytes;
115        }
116    }
117
118    pub async fn resolve_root_from_local_buses_with_source(
119        &self,
120        owner_pubkey: &str,
121        tree_name: &str,
122        timeout: Duration,
123    ) -> Option<(&'static str, PeerRootEvent)> {
124        resolve_root_from_local_buses_with_source(
125            self.local_buses().await,
126            owner_pubkey,
127            tree_name,
128            timeout,
129        )
130        .await
131    }
132
133    pub async fn resolve_root_from_local_buses(
134        &self,
135        owner_pubkey: &str,
136        tree_name: &str,
137        timeout: Duration,
138    ) -> Option<PeerRootEvent> {
139        self.resolve_root_from_local_buses_with_source(owner_pubkey, tree_name, timeout)
140            .await
141            .map(|(_, root)| root)
142    }
143}
144
145#[cfg(test)]
146mod tests {
147    use super::*;
148    use anyhow::Result;
149    use async_trait::async_trait;
150    use nostr_sdk::nostr::{Event, Filter};
151    use std::collections::BTreeSet;
152    use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
153    use std::time::Instant;
154
155    use crate::local_bus::LocalNostrBus;
156    use crate::runtime_peer::{
157        ConnectionState, MeshPeerEntry, PeerDirection, PeerSignalPath, PeerTransport,
158    };
159    use crate::types::{MeshNostrFrame, PeerHTLConfig, PeerId, PeerPool};
160
161    struct TestSession {
162        closed: AtomicBool,
163    }
164
165    #[async_trait]
166    impl MeshSession for TestSession {
167        fn is_ready(&self) -> bool {
168            true
169        }
170
171        fn is_connected(&self) -> bool {
172            true
173        }
174
175        fn htl_config(&self) -> PeerHTLConfig {
176            PeerHTLConfig::from_flags(false, false)
177        }
178
179        async fn request(&self, _hash_hex: &str, _timeout: Duration) -> Result<Option<Vec<u8>>> {
180            Ok(None)
181        }
182
183        async fn query_nostr_events(
184            &self,
185            _filters: Vec<Filter>,
186            _timeout: Duration,
187        ) -> Result<Vec<Event>> {
188            Ok(Vec::new())
189        }
190
191        async fn send_mesh_frame_text(&self, _frame: &MeshNostrFrame) -> Result<()> {
192            Ok(())
193        }
194
195        async fn close(&self) -> Result<()> {
196            self.closed.store(true, AtomicOrdering::Relaxed);
197            Ok(())
198        }
199    }
200
201    struct TestLocalBus {
202        source: &'static str,
203        root: Option<PeerRootEvent>,
204    }
205
206    #[async_trait]
207    impl LocalNostrBus for TestLocalBus {
208        fn source_name(&self) -> &'static str {
209            self.source
210        }
211
212        async fn broadcast_event(&self, _event: &Event) -> Result<()> {
213            Ok(())
214        }
215
216        async fn query_root(
217            &self,
218            _owner_pubkey: &str,
219            _tree_name: &str,
220            _timeout: Duration,
221        ) -> Option<PeerRootEvent> {
222            self.root.clone()
223        }
224    }
225
226    #[tokio::test]
227    async fn record_updates_global_and_per_peer_counters() {
228        let runtime = MeshRuntimeState::<TestSession>::new();
229        let peer_id = PeerId::new("peer-a".to_string());
230        let peer_key = peer_id.to_string();
231        runtime.peers.write().await.insert(
232            peer_key.clone(),
233            MeshPeerEntry {
234                peer_id,
235                direction: PeerDirection::Outbound,
236                state: ConnectionState::Connected,
237                last_seen: Instant::now(),
238                peer: None,
239                pool: PeerPool::Other,
240                transport: PeerTransport::WebRtc,
241                signal_paths: BTreeSet::from([PeerSignalPath::Relay]),
242                bytes_sent: 0,
243                bytes_received: 0,
244            },
245        );
246
247        runtime.record_sent(&peer_key, 16).await;
248        runtime.record_received(&peer_key, 32).await;
249
250        assert_eq!(runtime.get_bandwidth(), (16, 32));
251        let peers = runtime.peers.read().await;
252        let entry = peers.get(&peer_key).expect("peer");
253        assert_eq!(entry.bytes_sent, 16);
254        assert_eq!(entry.bytes_received, 32);
255    }
256
257    #[tokio::test]
258    async fn reset_closes_peers_and_clears_local_buses() {
259        let runtime = MeshRuntimeState::<TestSession>::new();
260        let session = TestSession {
261            closed: AtomicBool::new(false),
262        };
263        let peer_id = PeerId::new("peer-a".to_string());
264        runtime.peers.write().await.insert(
265            peer_id.to_string(),
266            MeshPeerEntry {
267                peer_id,
268                direction: PeerDirection::Outbound,
269                state: ConnectionState::Connected,
270                last_seen: Instant::now(),
271                peer: Some(session),
272                pool: PeerPool::Other,
273                transport: PeerTransport::Bluetooth,
274                signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
275                bytes_sent: 0,
276                bytes_received: 0,
277            },
278        );
279        runtime.connected_count.store(1, Ordering::Relaxed);
280        runtime
281            .set_local_buses(vec![Arc::new(TestLocalBus {
282                source: "mock",
283                root: None,
284            }) as SharedLocalNostrBus])
285            .await;
286
287        runtime.reset().await;
288
289        assert_eq!(runtime.connected_count.load(Ordering::Relaxed), 0);
290        assert!(runtime.peers.read().await.is_empty());
291        assert!(runtime.local_buses().await.is_empty());
292    }
293
294    #[tokio::test]
295    async fn resolve_root_from_local_buses_returns_first_match() {
296        let runtime = MeshRuntimeState::<TestSession>::new();
297        let root = PeerRootEvent {
298            hash: "ab".repeat(32),
299            key: None,
300            encrypted_key: None,
301            self_encrypted_key: None,
302            event_id: "event-1".to_string(),
303            created_at: 1,
304            peer_id: "bus-peer".to_string(),
305        };
306        runtime
307            .set_local_buses(vec![
308                Arc::new(TestLocalBus {
309                    source: "empty",
310                    root: None,
311                }) as SharedLocalNostrBus,
312                Arc::new(TestLocalBus {
313                    source: "mock-bus",
314                    root: Some(root.clone()),
315                }) as SharedLocalNostrBus,
316            ])
317            .await;
318
319        let resolved = runtime
320            .resolve_root_from_local_buses_with_source("owner", "tree", Duration::from_millis(10))
321            .await
322            .expect("root");
323
324        assert_eq!(resolved.0, "mock-bus");
325        assert_eq!(resolved.1, root);
326    }
327}