1use crate::types::{MeshStats, MeshStoreConfig, PeerId};
11use crate::{
12 MeshReadSource, MeshRouter, MeshRoutingConfig, NostrRelayTransport, ProductionMeshStore,
13 SelectorSummary, SignalingTransport, WebRtcPeerLinkFactory,
14};
15use async_trait::async_trait;
16use hashtree_blossom::BlossomClient;
17use hashtree_core::{Hash, Store, StoreError};
18use nostr_sdk::prelude::*;
19use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
20use std::sync::Arc;
21use thiserror::Error;
22use tokio::sync::Mutex;
23use tokio::task::JoinHandle;
24use tokio::time::Duration;
25
26struct BlossomReadSource {
27 id: String,
28 client: BlossomClient,
29}
30
31#[async_trait]
32impl MeshReadSource for BlossomReadSource {
33 fn id(&self) -> &str {
34 &self.id
35 }
36
37 async fn get(&self, hash: &Hash) -> Option<Vec<u8>> {
38 self.client.download(&hex::encode(hash)).await.ok()
39 }
40}
41
42#[derive(Debug, Error)]
43pub enum MeshStoreError {
44 #[error("Mesh transport error: {0}")]
45 Transport(String),
46 #[error("No peers available")]
47 NoPeers,
48 #[error("Data not found")]
49 NotFound,
50 #[error("Store error: {0}")]
51 Store(#[from] StoreError),
52}
53
54struct LiveMesh<S: Store + Send + Sync + 'static> {
55 transport: Arc<NostrRelayTransport>,
56 store: Arc<ProductionMeshStore<S>>,
57 tasks: Vec<JoinHandle<()>>,
58}
59
60pub struct MeshStore<S: Store + Send + Sync + 'static> {
62 local_store: Arc<S>,
63 config: MeshStoreConfig,
64 live: Mutex<Option<LiveMesh<S>>>,
65 running: Arc<AtomicBool>,
66 stats_requests_made: AtomicU64,
67 stats_requests_fulfilled: AtomicU64,
68 stats_bytes_received: AtomicU64,
69}
70
71impl<S: Store + Send + Sync + 'static> MeshStore<S> {
72 pub fn new(local_store: Arc<S>, config: MeshStoreConfig) -> Self {
74 Self {
75 local_store,
76 config,
77 live: Mutex::new(None),
78 running: Arc::new(AtomicBool::new(false)),
79 stats_requests_made: AtomicU64::new(0),
80 stats_requests_fulfilled: AtomicU64::new(0),
81 stats_bytes_received: AtomicU64::new(0),
82 }
83 }
84
85 fn routing_config(&self) -> MeshRoutingConfig {
86 MeshRoutingConfig {
87 selection_strategy: self.config.request_selection_strategy,
88 fairness_enabled: self.config.request_fairness_enabled,
89 cashu_payment_weight: 0.0,
90 cashu_payment_default_block_threshold: 0,
91 cashu_accepted_mints: Vec::new(),
92 cashu_default_mint: None,
93 cashu_peer_suggested_mint_base_cap_sat: 0,
94 cashu_peer_suggested_mint_success_step_sat: 0,
95 cashu_peer_suggested_mint_receipt_step_sat: 0,
96 cashu_peer_suggested_mint_max_cap_sat: 0,
97 dispatch: self.config.request_dispatch,
98 response_behavior: Default::default(),
99 pubsub_scheduler: Default::default(),
100 pubsub_delivery_mode: self.config.pubsub_delivery_mode,
101 }
102 }
103
104 async fn live_store(&self) -> Option<Arc<ProductionMeshStore<S>>> {
105 self.live
106 .lock()
107 .await
108 .as_ref()
109 .map(|live| live.store.clone())
110 }
111
112 pub async fn start(&mut self, keys: Keys) -> Result<(), MeshStoreError> {
114 if self.running.load(Ordering::Relaxed) {
115 return Ok(());
116 }
117
118 let peer_id = PeerId::new(keys.public_key().to_hex()).to_string();
119 let transport = Arc::new(NostrRelayTransport::new(keys.clone(), self.config.debug));
120 transport
121 .connect(&self.config.relays)
122 .await
123 .map_err(|e| MeshStoreError::Transport(e.to_string()))?;
124
125 let factory = Arc::new(WebRtcPeerLinkFactory::default());
126 let mut router = MeshRouter::new(
127 peer_id.clone(),
128 transport.clone(),
129 factory,
130 self.config.pools.clone(),
131 self.config.debug,
132 );
133 if let Some(classifier_tx) = self.config.classifier_tx.clone() {
134 router.set_classifier(classifier_tx);
135 }
136
137 let store = Arc::new(ProductionMeshStore::new_with_routing(
138 self.local_store.clone(),
139 Arc::new(router),
140 Duration::from_millis(self.config.request_timeout_ms),
141 self.config.debug,
142 self.routing_config(),
143 ));
144 if !self.config.upstream_blossom_servers.is_empty() {
145 let timeout = Duration::from_millis(self.config.request_timeout_ms);
146 let sources: Vec<Arc<dyn MeshReadSource>> = self
147 .config
148 .upstream_blossom_servers
149 .iter()
150 .map(|server| {
151 Arc::new(BlossomReadSource {
152 id: format!("blossom:{server}"),
153 client: BlossomClient::new_empty(keys.clone())
154 .with_timeout(timeout)
155 .with_read_servers(vec![server.clone()]),
156 }) as Arc<dyn MeshReadSource>
157 })
158 .collect();
159 store.set_read_sources(sources).await;
160 }
161 store
162 .start()
163 .await
164 .map_err(|e| MeshStoreError::Transport(e.to_string()))?;
165
166 self.running.store(true, Ordering::Relaxed);
167
168 let running = self.running.clone();
169 let signaling_store = store.clone();
170 let signaling_transport = transport.clone();
171 let signaling_task = tokio::spawn(async move {
172 loop {
173 if !running.load(Ordering::Relaxed) {
174 break;
175 }
176
177 let Some(msg) = signaling_transport.recv().await else {
178 break;
179 };
180 if signaling_store.process_signaling(msg).await.is_err() {
181 continue;
182 }
183 }
184 });
185
186 let running = self.running.clone();
187 let data_store = store.clone();
188 let data_task = tokio::spawn(async move {
189 let mut ticker = tokio::time::interval(Duration::from_millis(10));
190 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
191 loop {
192 ticker.tick().await;
193 if !running.load(Ordering::Relaxed) {
194 break;
195 }
196 let _ = data_store.drain_available_data_messages().await;
197 }
198 });
199
200 let running = self.running.clone();
201 let hello_store = store.clone();
202 let hello_interval_ms = self.config.hello_interval_ms;
203 let hello_task = tokio::spawn(async move {
204 let mut ticker = tokio::time::interval(Duration::from_millis(hello_interval_ms));
205 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
206 loop {
207 ticker.tick().await;
208 if !running.load(Ordering::Relaxed) {
209 break;
210 }
211 let _ = hello_store.send_hello().await;
212 }
213 });
214
215 *self.live.lock().await = Some(LiveMesh {
216 transport,
217 store,
218 tasks: vec![signaling_task, data_task, hello_task],
219 });
220
221 Ok(())
222 }
223
224 pub async fn stop(&self) {
226 self.running.store(false, Ordering::Relaxed);
227
228 let live = self.live.lock().await.take();
229 let Some(live) = live else {
230 return;
231 };
232
233 live.store.stop().await;
234
235 let peer_ids = live.store.signaling().peer_ids().await;
236 for peer_id in peer_ids {
237 if let Some(channel) = live.store.signaling().get_channel(&peer_id).await {
238 channel.close().await;
239 }
240 }
241
242 for task in live.tasks {
243 task.abort();
244 }
245 live.transport.disconnect().await;
246 }
247
248 pub async fn stats(&self) -> MeshStats {
250 MeshStats {
251 connected_peers: self.peer_count().await,
252 pending_requests: 0,
253 bytes_sent: 0,
254 bytes_received: self.stats_bytes_received.load(Ordering::Relaxed),
255 requests_made: self.stats_requests_made.load(Ordering::Relaxed),
256 requests_fulfilled: self.stats_requests_fulfilled.load(Ordering::Relaxed),
257 }
258 }
259
260 pub async fn peer_count(&self) -> usize {
262 let Some(store) = self.live_store().await else {
263 return 0;
264 };
265 store.peer_count().await
266 }
267
268 pub async fn selector_summary(&self) -> SelectorSummary {
270 let Some(store) = self.live_store().await else {
271 return SelectorSummary::default();
272 };
273 store.selector_summary().await
274 }
275}
276
277#[async_trait]
278impl<S: Store + Send + Sync + 'static> Store for MeshStore<S> {
279 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
280 self.local_store.put(hash, data).await
281 }
282
283 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
284 if let Some(data) = self.local_store.get(hash).await? {
285 return Ok(Some(data));
286 }
287
288 self.stats_requests_made.fetch_add(1, Ordering::Relaxed);
289
290 let Some(store) = self.live_store().await else {
291 return Ok(None);
292 };
293
294 let result = store.get(hash).await?;
295 if let Some(ref data) = result {
296 self.stats_requests_fulfilled
297 .fetch_add(1, Ordering::Relaxed);
298 self.stats_bytes_received
299 .fetch_add(data.len() as u64, Ordering::Relaxed);
300 }
301 Ok(result)
302 }
303
304 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
305 self.local_store.has(hash).await
306 }
307
308 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
309 self.local_store.delete(hash).await
310 }
311}