Skip to main content

hashtree_network/
store.rs

1//! Default production mesh store wrapper.
2//!
3//! This is intentionally a thin composition layer around the shared routed
4//! mesh store core. The reusable routing/retrieval behavior lives in
5//! [`crate::mesh_store_core`]; this wrapper only plugs in the default production
6//! transport pair:
7//! - signaling: Nostr websocket relays
8//! - direct links: real WebRTC data channels
9
10use crate::types::{MeshStats, MeshStoreConfig, PeerId};
11use crate::{
12    MeshRouter, MeshRoutingConfig, NostrRelayTransport, ProductionMeshStore, SelectorSummary,
13    SignalingTransport, WebRtcPeerLinkFactory,
14};
15use async_trait::async_trait;
16use hashtree_core::{Hash, Store, StoreError};
17use nostr_sdk::prelude::*;
18use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
19use std::sync::Arc;
20use thiserror::Error;
21use tokio::sync::Mutex;
22use tokio::task::JoinHandle;
23use tokio::time::Duration;
24
25#[derive(Debug, Error)]
26pub enum MeshStoreError {
27    #[error("Mesh transport error: {0}")]
28    Transport(String),
29    #[error("No peers available")]
30    NoPeers,
31    #[error("Data not found")]
32    NotFound,
33    #[error("Store error: {0}")]
34    Store(#[from] StoreError),
35}
36
37struct LiveMesh<S: Store + Send + Sync + 'static> {
38    transport: Arc<NostrRelayTransport>,
39    store: Arc<ProductionMeshStore<S>>,
40    tasks: Vec<JoinHandle<()>>,
41}
42
43/// Production mesh-backed store that uses the shared routed mesh core.
44pub struct MeshStore<S: Store + Send + Sync + 'static> {
45    local_store: Arc<S>,
46    config: MeshStoreConfig,
47    live: Mutex<Option<LiveMesh<S>>>,
48    running: Arc<AtomicBool>,
49    stats_requests_made: AtomicU64,
50    stats_requests_fulfilled: AtomicU64,
51    stats_bytes_received: AtomicU64,
52}
53
54impl<S: Store + Send + Sync + 'static> MeshStore<S> {
55    /// Create a new production mesh store wrapper.
56    pub fn new(local_store: Arc<S>, config: MeshStoreConfig) -> Self {
57        Self {
58            local_store,
59            config,
60            live: Mutex::new(None),
61            running: Arc::new(AtomicBool::new(false)),
62            stats_requests_made: AtomicU64::new(0),
63            stats_requests_fulfilled: AtomicU64::new(0),
64            stats_bytes_received: AtomicU64::new(0),
65        }
66    }
67
68    fn routing_config(&self) -> MeshRoutingConfig {
69        MeshRoutingConfig {
70            selection_strategy: self.config.request_selection_strategy,
71            fairness_enabled: self.config.request_fairness_enabled,
72            cashu_payment_weight: 0.0,
73            cashu_payment_default_block_threshold: 0,
74            cashu_accepted_mints: Vec::new(),
75            cashu_default_mint: None,
76            cashu_peer_suggested_mint_base_cap_sat: 0,
77            cashu_peer_suggested_mint_success_step_sat: 0,
78            cashu_peer_suggested_mint_receipt_step_sat: 0,
79            cashu_peer_suggested_mint_max_cap_sat: 0,
80            dispatch: self.config.request_dispatch,
81            response_behavior: Default::default(),
82        }
83    }
84
85    async fn live_store(&self) -> Option<Arc<ProductionMeshStore<S>>> {
86        self.live
87            .lock()
88            .await
89            .as_ref()
90            .map(|live| live.store.clone())
91    }
92
93    /// Start the production transport composition and shared mesh core.
94    pub async fn start(&mut self, keys: Keys) -> Result<(), MeshStoreError> {
95        if self.running.load(Ordering::Relaxed) {
96            return Ok(());
97        }
98
99        let peer_id = PeerId::new(keys.public_key().to_hex()).to_string();
100        let transport = Arc::new(NostrRelayTransport::new(keys, self.config.debug));
101        transport
102            .connect(&self.config.relays)
103            .await
104            .map_err(|e| MeshStoreError::Transport(e.to_string()))?;
105
106        let factory = Arc::new(WebRtcPeerLinkFactory::default());
107        let mut router = MeshRouter::new(
108            peer_id.clone(),
109            transport.clone(),
110            factory,
111            self.config.pools.clone(),
112            self.config.debug,
113        );
114        if let Some(classifier_tx) = self.config.classifier_tx.clone() {
115            router.set_classifier(classifier_tx);
116        }
117
118        let store = Arc::new(ProductionMeshStore::new_with_routing(
119            self.local_store.clone(),
120            Arc::new(router),
121            Duration::from_millis(self.config.request_timeout_ms),
122            self.config.debug,
123            self.routing_config(),
124        ));
125        store
126            .start()
127            .await
128            .map_err(|e| MeshStoreError::Transport(e.to_string()))?;
129
130        self.running.store(true, Ordering::Relaxed);
131
132        let running = self.running.clone();
133        let signaling_store = store.clone();
134        let signaling_transport = transport.clone();
135        let signaling_task = tokio::spawn(async move {
136            loop {
137                if !running.load(Ordering::Relaxed) {
138                    break;
139                }
140
141                let Some(msg) = signaling_transport.recv().await else {
142                    break;
143                };
144                if signaling_store.process_signaling(msg).await.is_err() {
145                    continue;
146                }
147            }
148        });
149
150        let running = self.running.clone();
151        let data_store = store.clone();
152        let data_task = tokio::spawn(async move {
153            let mut ticker = tokio::time::interval(Duration::from_millis(10));
154            ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
155            loop {
156                ticker.tick().await;
157                if !running.load(Ordering::Relaxed) {
158                    break;
159                }
160                let _ = data_store.drain_available_data_messages().await;
161            }
162        });
163
164        let running = self.running.clone();
165        let hello_store = store.clone();
166        let hello_interval_ms = self.config.hello_interval_ms;
167        let hello_task = tokio::spawn(async move {
168            let mut ticker = tokio::time::interval(Duration::from_millis(hello_interval_ms));
169            ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
170            loop {
171                ticker.tick().await;
172                if !running.load(Ordering::Relaxed) {
173                    break;
174                }
175                let _ = hello_store.send_hello().await;
176            }
177        });
178
179        *self.live.lock().await = Some(LiveMesh {
180            transport,
181            store,
182            tasks: vec![signaling_task, data_task, hello_task],
183        });
184
185        Ok(())
186    }
187
188    /// Stop the production transport composition and background tasks.
189    pub async fn stop(&self) {
190        self.running.store(false, Ordering::Relaxed);
191
192        let live = self.live.lock().await.take();
193        let Some(live) = live else {
194            return;
195        };
196
197        live.store.stop().await;
198
199        let peer_ids = live.store.signaling().peer_ids().await;
200        for peer_id in peer_ids {
201            if let Some(channel) = live.store.signaling().get_channel(&peer_id).await {
202                channel.close().await;
203            }
204        }
205
206        for task in live.tasks {
207            task.abort();
208        }
209        live.transport.disconnect().await;
210    }
211
212    /// Snapshot high-level store statistics.
213    pub async fn stats(&self) -> MeshStats {
214        MeshStats {
215            connected_peers: self.peer_count().await,
216            pending_requests: 0,
217            bytes_sent: 0,
218            bytes_received: self.stats_bytes_received.load(Ordering::Relaxed),
219            requests_made: self.stats_requests_made.load(Ordering::Relaxed),
220            requests_fulfilled: self.stats_requests_fulfilled.load(Ordering::Relaxed),
221        }
222    }
223
224    /// Get the number of currently connected peers.
225    pub async fn peer_count(&self) -> usize {
226        let Some(store) = self.live_store().await else {
227            return 0;
228        };
229        store.peer_count().await
230    }
231
232    /// Get adaptive routing selector summary statistics.
233    pub async fn selector_summary(&self) -> SelectorSummary {
234        let Some(store) = self.live_store().await else {
235            return SelectorSummary::default();
236        };
237        store.selector_summary().await
238    }
239}
240
241#[async_trait]
242impl<S: Store + Send + Sync + 'static> Store for MeshStore<S> {
243    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
244        self.local_store.put(hash, data).await
245    }
246
247    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
248        if let Some(data) = self.local_store.get(hash).await? {
249            return Ok(Some(data));
250        }
251
252        self.stats_requests_made.fetch_add(1, Ordering::Relaxed);
253
254        let Some(store) = self.live_store().await else {
255            return Ok(None);
256        };
257
258        let result = store.get(hash).await?;
259        if let Some(ref data) = result {
260            self.stats_requests_fulfilled
261                .fetch_add(1, Ordering::Relaxed);
262            self.stats_bytes_received
263                .fetch_add(data.len() as u64, Ordering::Relaxed);
264        }
265        Ok(result)
266    }
267
268    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
269        self.local_store.has(hash).await
270    }
271
272    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
273        self.local_store.delete(hash).await
274    }
275}