Skip to main content

hashtree_network/
store.rs

1//! Default production mesh store wrapper.
2//!
3//! This is intentionally a thin composition layer around the shared routed
4//! mesh store core. The reusable routing/retrieval behavior lives in
5//! [`crate::mesh_store_core`]; this wrapper only plugs in the default production
6//! transport pair:
7//! - signaling: Nostr websocket relays
8//! - direct links: real WebRTC data channels
9
10use crate::types::{MeshStats, MeshStoreConfig, PeerId};
11use crate::{
12    MeshReadSource, MeshRouter, MeshRoutingConfig, NostrRelayTransport, ProductionMeshStore,
13    SelectorSummary, SignalingTransport, WebRtcPeerLinkFactory,
14};
15use async_trait::async_trait;
16use hashtree_blossom::BlossomClient;
17use hashtree_core::{Hash, Store, StoreError};
18use nostr_sdk::prelude::*;
19use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
20use std::sync::Arc;
21use thiserror::Error;
22use tokio::sync::Mutex;
23use tokio::task::JoinHandle;
24use tokio::time::Duration;
25
26struct BlossomReadSource {
27    id: String,
28    client: BlossomClient,
29}
30
31#[async_trait]
32impl MeshReadSource for BlossomReadSource {
33    fn id(&self) -> &str {
34        &self.id
35    }
36
37    async fn get(&self, hash: &Hash) -> Option<Vec<u8>> {
38        self.client.download(&hex::encode(hash)).await.ok()
39    }
40}
41
42#[derive(Debug, Error)]
43pub enum MeshStoreError {
44    #[error("Mesh transport error: {0}")]
45    Transport(String),
46    #[error("No peers available")]
47    NoPeers,
48    #[error("Data not found")]
49    NotFound,
50    #[error("Store error: {0}")]
51    Store(#[from] StoreError),
52}
53
54struct LiveMesh<S: Store + Send + Sync + 'static> {
55    transport: Arc<NostrRelayTransport>,
56    store: Arc<ProductionMeshStore<S>>,
57    tasks: Vec<JoinHandle<()>>,
58}
59
60/// Production mesh-backed store that uses the shared routed mesh core.
61pub struct MeshStore<S: Store + Send + Sync + 'static> {
62    local_store: Arc<S>,
63    config: MeshStoreConfig,
64    live: Mutex<Option<LiveMesh<S>>>,
65    running: Arc<AtomicBool>,
66    stats_requests_made: AtomicU64,
67    stats_requests_fulfilled: AtomicU64,
68    stats_bytes_received: AtomicU64,
69}
70
71impl<S: Store + Send + Sync + 'static> MeshStore<S> {
72    /// Create a new production mesh store wrapper.
73    pub fn new(local_store: Arc<S>, config: MeshStoreConfig) -> Self {
74        Self {
75            local_store,
76            config,
77            live: Mutex::new(None),
78            running: Arc::new(AtomicBool::new(false)),
79            stats_requests_made: AtomicU64::new(0),
80            stats_requests_fulfilled: AtomicU64::new(0),
81            stats_bytes_received: AtomicU64::new(0),
82        }
83    }
84
85    fn routing_config(&self) -> MeshRoutingConfig {
86        MeshRoutingConfig {
87            selection_strategy: self.config.request_selection_strategy,
88            fairness_enabled: self.config.request_fairness_enabled,
89            cashu_payment_weight: 0.0,
90            cashu_payment_default_block_threshold: 0,
91            cashu_accepted_mints: Vec::new(),
92            cashu_default_mint: None,
93            cashu_peer_suggested_mint_base_cap_sat: 0,
94            cashu_peer_suggested_mint_success_step_sat: 0,
95            cashu_peer_suggested_mint_receipt_step_sat: 0,
96            cashu_peer_suggested_mint_max_cap_sat: 0,
97            dispatch: self.config.request_dispatch,
98            response_behavior: Default::default(),
99            pubsub_scheduler: Default::default(),
100            pubsub_delivery_mode: self.config.pubsub_delivery_mode,
101        }
102    }
103
104    async fn live_store(&self) -> Option<Arc<ProductionMeshStore<S>>> {
105        self.live
106            .lock()
107            .await
108            .as_ref()
109            .map(|live| live.store.clone())
110    }
111
112    /// Start the production transport composition and shared mesh core.
113    pub async fn start(&mut self, keys: Keys) -> Result<(), MeshStoreError> {
114        if self.running.load(Ordering::Relaxed) {
115            return Ok(());
116        }
117
118        let peer_id = PeerId::new(keys.public_key().to_hex()).to_string();
119        let transport = Arc::new(NostrRelayTransport::new(keys.clone(), self.config.debug));
120        transport
121            .connect(&self.config.relays)
122            .await
123            .map_err(|e| MeshStoreError::Transport(e.to_string()))?;
124
125        let factory = Arc::new(WebRtcPeerLinkFactory::default());
126        let mut router = MeshRouter::new(
127            peer_id.clone(),
128            transport.clone(),
129            factory,
130            self.config.pools.clone(),
131            self.config.debug,
132        );
133        if let Some(classifier_tx) = self.config.classifier_tx.clone() {
134            router.set_classifier(classifier_tx);
135        }
136
137        let store = Arc::new(ProductionMeshStore::new_with_routing(
138            self.local_store.clone(),
139            Arc::new(router),
140            Duration::from_millis(self.config.request_timeout_ms),
141            self.config.debug,
142            self.routing_config(),
143        ));
144        if !self.config.upstream_blossom_servers.is_empty() {
145            let timeout = Duration::from_millis(self.config.request_timeout_ms);
146            let sources: Vec<Arc<dyn MeshReadSource>> = self
147                .config
148                .upstream_blossom_servers
149                .iter()
150                .map(|server| {
151                    Arc::new(BlossomReadSource {
152                        id: format!("blossom:{server}"),
153                        client: BlossomClient::new_empty(keys.clone())
154                            .with_timeout(timeout)
155                            .with_read_servers(vec![server.clone()]),
156                    }) as Arc<dyn MeshReadSource>
157                })
158                .collect();
159            store.set_read_sources(sources).await;
160        }
161        store
162            .start()
163            .await
164            .map_err(|e| MeshStoreError::Transport(e.to_string()))?;
165
166        self.running.store(true, Ordering::Relaxed);
167
168        let running = self.running.clone();
169        let signaling_store = store.clone();
170        let signaling_transport = transport.clone();
171        let signaling_task = tokio::spawn(async move {
172            loop {
173                if !running.load(Ordering::Relaxed) {
174                    break;
175                }
176
177                let Some(msg) = signaling_transport.recv().await else {
178                    break;
179                };
180                if signaling_store.process_signaling(msg).await.is_err() {
181                    continue;
182                }
183            }
184        });
185
186        let running = self.running.clone();
187        let data_store = store.clone();
188        let data_task = tokio::spawn(async move {
189            let mut ticker = tokio::time::interval(Duration::from_millis(10));
190            ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
191            loop {
192                ticker.tick().await;
193                if !running.load(Ordering::Relaxed) {
194                    break;
195                }
196                let _ = data_store.drain_available_data_messages().await;
197            }
198        });
199
200        let running = self.running.clone();
201        let hello_store = store.clone();
202        let hello_interval_ms = self.config.hello_interval_ms;
203        let hello_task = tokio::spawn(async move {
204            let mut ticker = tokio::time::interval(Duration::from_millis(hello_interval_ms));
205            ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
206            loop {
207                ticker.tick().await;
208                if !running.load(Ordering::Relaxed) {
209                    break;
210                }
211                let _ = hello_store.send_hello().await;
212            }
213        });
214
215        *self.live.lock().await = Some(LiveMesh {
216            transport,
217            store,
218            tasks: vec![signaling_task, data_task, hello_task],
219        });
220
221        Ok(())
222    }
223
224    /// Stop the production transport composition and background tasks.
225    pub async fn stop(&self) {
226        self.running.store(false, Ordering::Relaxed);
227
228        let live = self.live.lock().await.take();
229        let Some(live) = live else {
230            return;
231        };
232
233        live.store.stop().await;
234
235        let peer_ids = live.store.signaling().peer_ids().await;
236        for peer_id in peer_ids {
237            if let Some(channel) = live.store.signaling().get_channel(&peer_id).await {
238                channel.close().await;
239            }
240        }
241
242        for task in live.tasks {
243            task.abort();
244        }
245        live.transport.disconnect().await;
246    }
247
248    /// Snapshot high-level store statistics.
249    pub async fn stats(&self) -> MeshStats {
250        MeshStats {
251            connected_peers: self.peer_count().await,
252            pending_requests: 0,
253            bytes_sent: 0,
254            bytes_received: self.stats_bytes_received.load(Ordering::Relaxed),
255            requests_made: self.stats_requests_made.load(Ordering::Relaxed),
256            requests_fulfilled: self.stats_requests_fulfilled.load(Ordering::Relaxed),
257        }
258    }
259
260    /// Get the number of currently connected peers.
261    pub async fn peer_count(&self) -> usize {
262        let Some(store) = self.live_store().await else {
263            return 0;
264        };
265        store.peer_count().await
266    }
267
268    /// Get adaptive routing selector summary statistics.
269    pub async fn selector_summary(&self) -> SelectorSummary {
270        let Some(store) = self.live_store().await else {
271            return SelectorSummary::default();
272        };
273        store.selector_summary().await
274    }
275}
276
277#[async_trait]
278impl<S: Store + Send + Sync + 'static> Store for MeshStore<S> {
279    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
280        self.local_store.put(hash, data).await
281    }
282
283    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
284        if let Some(data) = self.local_store.get(hash).await? {
285            return Ok(Some(data));
286        }
287
288        self.stats_requests_made.fetch_add(1, Ordering::Relaxed);
289
290        let Some(store) = self.live_store().await else {
291            return Ok(None);
292        };
293
294        let result = store.get(hash).await?;
295        if let Some(ref data) = result {
296            self.stats_requests_fulfilled
297                .fetch_add(1, Ordering::Relaxed);
298            self.stats_bytes_received
299                .fetch_add(data.len() as u64, Ordering::Relaxed);
300        }
301        Ok(result)
302    }
303
304    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
305        self.local_store.has(hash).await
306    }
307
308    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
309        self.local_store.delete(hash).await
310    }
311}