Skip to main content

hashtree_network/
store.rs

1//! Default production mesh store wrapper.
2//!
3//! This is intentionally a thin composition layer around the shared routed
4//! mesh store core. The reusable routing/retrieval behavior lives in
5//! [`crate::mesh_store_core`]; this wrapper only plugs in the default production
6//! transport pair:
7//! - signaling: Nostr websocket relays
8//! - direct links: real WebRTC data channels
9
10use crate::types::{MeshStats, MeshStoreConfig, PeerId};
11use crate::{
12    MeshReadSource, MeshRouter, MeshRoutingConfig, NostrRelayTransport, ProductionMeshStore,
13    SelectorSummary, SignalingTransport, WebRtcPeerLinkFactory,
14};
15use async_trait::async_trait;
16use hashtree_blossom::BlossomClient;
17use hashtree_core::{Hash, Store, StoreError};
18use nostr_sdk::prelude::*;
19use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
20use std::sync::Arc;
21use thiserror::Error;
22use tokio::sync::Mutex;
23use tokio::task::JoinHandle;
24use tokio::time::Duration;
25
26struct BlossomReadSource {
27    id: String,
28    client: BlossomClient,
29}
30
31#[async_trait]
32impl MeshReadSource for BlossomReadSource {
33    fn id(&self) -> &str {
34        &self.id
35    }
36
37    async fn get(&self, hash: &Hash) -> Option<Vec<u8>> {
38        self.client.download(&hex::encode(hash)).await.ok()
39    }
40}
41
42#[derive(Debug, Error)]
43pub enum MeshStoreError {
44    #[error("Mesh transport error: {0}")]
45    Transport(String),
46    #[error("No peers available")]
47    NoPeers,
48    #[error("Data not found")]
49    NotFound,
50    #[error("Store error: {0}")]
51    Store(#[from] StoreError),
52}
53
54struct LiveMesh<S: Store + Send + Sync + 'static> {
55    transport: Arc<NostrRelayTransport>,
56    store: Arc<ProductionMeshStore<S>>,
57    tasks: Vec<JoinHandle<()>>,
58}
59
60/// Production mesh-backed store that uses the shared routed mesh core.
61pub struct MeshStore<S: Store + Send + Sync + 'static> {
62    local_store: Arc<S>,
63    config: MeshStoreConfig,
64    live: Mutex<Option<LiveMesh<S>>>,
65    running: Arc<AtomicBool>,
66    stats_requests_made: AtomicU64,
67    stats_requests_fulfilled: AtomicU64,
68    stats_bytes_received: AtomicU64,
69}
70
71impl<S: Store + Send + Sync + 'static> MeshStore<S> {
72    /// Create a new production mesh store wrapper.
73    pub fn new(local_store: Arc<S>, config: MeshStoreConfig) -> Self {
74        Self {
75            local_store,
76            config,
77            live: Mutex::new(None),
78            running: Arc::new(AtomicBool::new(false)),
79            stats_requests_made: AtomicU64::new(0),
80            stats_requests_fulfilled: AtomicU64::new(0),
81            stats_bytes_received: AtomicU64::new(0),
82        }
83    }
84
85    fn routing_config(&self) -> MeshRoutingConfig {
86        MeshRoutingConfig {
87            selection_strategy: self.config.request_selection_strategy,
88            fairness_enabled: self.config.request_fairness_enabled,
89            cashu_payment_weight: 0.0,
90            cashu_payment_default_block_threshold: 0,
91            cashu_accepted_mints: Vec::new(),
92            cashu_default_mint: None,
93            cashu_peer_suggested_mint_base_cap_sat: 0,
94            cashu_peer_suggested_mint_success_step_sat: 0,
95            cashu_peer_suggested_mint_receipt_step_sat: 0,
96            cashu_peer_suggested_mint_max_cap_sat: 0,
97            dispatch: self.config.request_dispatch,
98            response_behavior: Default::default(),
99        }
100    }
101
102    async fn live_store(&self) -> Option<Arc<ProductionMeshStore<S>>> {
103        self.live
104            .lock()
105            .await
106            .as_ref()
107            .map(|live| live.store.clone())
108    }
109
110    /// Start the production transport composition and shared mesh core.
111    pub async fn start(&mut self, keys: Keys) -> Result<(), MeshStoreError> {
112        if self.running.load(Ordering::Relaxed) {
113            return Ok(());
114        }
115
116        let peer_id = PeerId::new(keys.public_key().to_hex()).to_string();
117        let transport = Arc::new(NostrRelayTransport::new(keys.clone(), self.config.debug));
118        transport
119            .connect(&self.config.relays)
120            .await
121            .map_err(|e| MeshStoreError::Transport(e.to_string()))?;
122
123        let factory = Arc::new(WebRtcPeerLinkFactory::default());
124        let mut router = MeshRouter::new(
125            peer_id.clone(),
126            transport.clone(),
127            factory,
128            self.config.pools.clone(),
129            self.config.debug,
130        );
131        if let Some(classifier_tx) = self.config.classifier_tx.clone() {
132            router.set_classifier(classifier_tx);
133        }
134
135        let store = Arc::new(ProductionMeshStore::new_with_routing(
136            self.local_store.clone(),
137            Arc::new(router),
138            Duration::from_millis(self.config.request_timeout_ms),
139            self.config.debug,
140            self.routing_config(),
141        ));
142        if !self.config.upstream_blossom_servers.is_empty() {
143            let timeout = Duration::from_millis(self.config.request_timeout_ms);
144            let sources: Vec<Arc<dyn MeshReadSource>> = self
145                .config
146                .upstream_blossom_servers
147                .iter()
148                .map(|server| {
149                    Arc::new(BlossomReadSource {
150                        id: format!("blossom:{server}"),
151                        client: BlossomClient::new_empty(keys.clone())
152                            .with_timeout(timeout)
153                            .with_read_servers(vec![server.clone()]),
154                    }) as Arc<dyn MeshReadSource>
155                })
156                .collect();
157            store.set_read_sources(sources).await;
158        }
159        store
160            .start()
161            .await
162            .map_err(|e| MeshStoreError::Transport(e.to_string()))?;
163
164        self.running.store(true, Ordering::Relaxed);
165
166        let running = self.running.clone();
167        let signaling_store = store.clone();
168        let signaling_transport = transport.clone();
169        let signaling_task = tokio::spawn(async move {
170            loop {
171                if !running.load(Ordering::Relaxed) {
172                    break;
173                }
174
175                let Some(msg) = signaling_transport.recv().await else {
176                    break;
177                };
178                if signaling_store.process_signaling(msg).await.is_err() {
179                    continue;
180                }
181            }
182        });
183
184        let running = self.running.clone();
185        let data_store = store.clone();
186        let data_task = tokio::spawn(async move {
187            let mut ticker = tokio::time::interval(Duration::from_millis(10));
188            ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
189            loop {
190                ticker.tick().await;
191                if !running.load(Ordering::Relaxed) {
192                    break;
193                }
194                let _ = data_store.drain_available_data_messages().await;
195            }
196        });
197
198        let running = self.running.clone();
199        let hello_store = store.clone();
200        let hello_interval_ms = self.config.hello_interval_ms;
201        let hello_task = tokio::spawn(async move {
202            let mut ticker = tokio::time::interval(Duration::from_millis(hello_interval_ms));
203            ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
204            loop {
205                ticker.tick().await;
206                if !running.load(Ordering::Relaxed) {
207                    break;
208                }
209                let _ = hello_store.send_hello().await;
210            }
211        });
212
213        *self.live.lock().await = Some(LiveMesh {
214            transport,
215            store,
216            tasks: vec![signaling_task, data_task, hello_task],
217        });
218
219        Ok(())
220    }
221
222    /// Stop the production transport composition and background tasks.
223    pub async fn stop(&self) {
224        self.running.store(false, Ordering::Relaxed);
225
226        let live = self.live.lock().await.take();
227        let Some(live) = live else {
228            return;
229        };
230
231        live.store.stop().await;
232
233        let peer_ids = live.store.signaling().peer_ids().await;
234        for peer_id in peer_ids {
235            if let Some(channel) = live.store.signaling().get_channel(&peer_id).await {
236                channel.close().await;
237            }
238        }
239
240        for task in live.tasks {
241            task.abort();
242        }
243        live.transport.disconnect().await;
244    }
245
246    /// Snapshot high-level store statistics.
247    pub async fn stats(&self) -> MeshStats {
248        MeshStats {
249            connected_peers: self.peer_count().await,
250            pending_requests: 0,
251            bytes_sent: 0,
252            bytes_received: self.stats_bytes_received.load(Ordering::Relaxed),
253            requests_made: self.stats_requests_made.load(Ordering::Relaxed),
254            requests_fulfilled: self.stats_requests_fulfilled.load(Ordering::Relaxed),
255        }
256    }
257
258    /// Get the number of currently connected peers.
259    pub async fn peer_count(&self) -> usize {
260        let Some(store) = self.live_store().await else {
261            return 0;
262        };
263        store.peer_count().await
264    }
265
266    /// Get adaptive routing selector summary statistics.
267    pub async fn selector_summary(&self) -> SelectorSummary {
268        let Some(store) = self.live_store().await else {
269            return SelectorSummary::default();
270        };
271        store.selector_summary().await
272    }
273}
274
275#[async_trait]
276impl<S: Store + Send + Sync + 'static> Store for MeshStore<S> {
277    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
278        self.local_store.put(hash, data).await
279    }
280
281    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
282        if let Some(data) = self.local_store.get(hash).await? {
283            return Ok(Some(data));
284        }
285
286        self.stats_requests_made.fetch_add(1, Ordering::Relaxed);
287
288        let Some(store) = self.live_store().await else {
289            return Ok(None);
290        };
291
292        let result = store.get(hash).await?;
293        if let Some(ref data) = result {
294            self.stats_requests_fulfilled
295                .fetch_add(1, Ordering::Relaxed);
296            self.stats_bytes_received
297                .fetch_add(data.len() as u64, Ordering::Relaxed);
298        }
299        Ok(result)
300    }
301
302    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
303        self.local_store.has(hash).await
304    }
305
306    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
307        self.local_store.delete(hash).await
308    }
309}