1use crate::types::{MeshStats, MeshStoreConfig, PeerId};
11use crate::{
12 MeshReadSource, MeshRouter, MeshRoutingConfig, NostrRelayTransport, ProductionMeshStore,
13 SelectorSummary, SignalingTransport, WebRtcPeerLinkFactory,
14};
15use async_trait::async_trait;
16use hashtree_blossom::BlossomClient;
17use hashtree_core::{Hash, Store, StoreError};
18use nostr_sdk::prelude::*;
19use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
20use std::sync::Arc;
21use thiserror::Error;
22use tokio::sync::Mutex;
23use tokio::task::JoinHandle;
24use tokio::time::Duration;
25
26struct BlossomReadSource {
27 id: String,
28 client: BlossomClient,
29}
30
31#[async_trait]
32impl MeshReadSource for BlossomReadSource {
33 fn id(&self) -> &str {
34 &self.id
35 }
36
37 async fn get(&self, hash: &Hash) -> Option<Vec<u8>> {
38 self.client.download(&hex::encode(hash)).await.ok()
39 }
40}
41
42#[derive(Debug, Error)]
43pub enum MeshStoreError {
44 #[error("Mesh transport error: {0}")]
45 Transport(String),
46 #[error("No peers available")]
47 NoPeers,
48 #[error("Data not found")]
49 NotFound,
50 #[error("Store error: {0}")]
51 Store(#[from] StoreError),
52}
53
54struct LiveMesh<S: Store + Send + Sync + 'static> {
55 transport: Arc<NostrRelayTransport>,
56 store: Arc<ProductionMeshStore<S>>,
57 tasks: Vec<JoinHandle<()>>,
58}
59
60pub struct MeshStore<S: Store + Send + Sync + 'static> {
62 local_store: Arc<S>,
63 config: MeshStoreConfig,
64 live: Mutex<Option<LiveMesh<S>>>,
65 running: Arc<AtomicBool>,
66 stats_requests_made: AtomicU64,
67 stats_requests_fulfilled: AtomicU64,
68 stats_bytes_received: AtomicU64,
69}
70
71impl<S: Store + Send + Sync + 'static> MeshStore<S> {
72 pub fn new(local_store: Arc<S>, config: MeshStoreConfig) -> Self {
74 Self {
75 local_store,
76 config,
77 live: Mutex::new(None),
78 running: Arc::new(AtomicBool::new(false)),
79 stats_requests_made: AtomicU64::new(0),
80 stats_requests_fulfilled: AtomicU64::new(0),
81 stats_bytes_received: AtomicU64::new(0),
82 }
83 }
84
85 fn routing_config(&self) -> MeshRoutingConfig {
86 MeshRoutingConfig {
87 selection_strategy: self.config.request_selection_strategy,
88 fairness_enabled: self.config.request_fairness_enabled,
89 cashu_payment_weight: 0.0,
90 cashu_payment_default_block_threshold: 0,
91 cashu_accepted_mints: Vec::new(),
92 cashu_default_mint: None,
93 cashu_peer_suggested_mint_base_cap_sat: 0,
94 cashu_peer_suggested_mint_success_step_sat: 0,
95 cashu_peer_suggested_mint_receipt_step_sat: 0,
96 cashu_peer_suggested_mint_max_cap_sat: 0,
97 dispatch: self.config.request_dispatch,
98 response_behavior: Default::default(),
99 }
100 }
101
102 async fn live_store(&self) -> Option<Arc<ProductionMeshStore<S>>> {
103 self.live
104 .lock()
105 .await
106 .as_ref()
107 .map(|live| live.store.clone())
108 }
109
110 pub async fn start(&mut self, keys: Keys) -> Result<(), MeshStoreError> {
112 if self.running.load(Ordering::Relaxed) {
113 return Ok(());
114 }
115
116 let peer_id = PeerId::new(keys.public_key().to_hex()).to_string();
117 let transport = Arc::new(NostrRelayTransport::new(keys.clone(), self.config.debug));
118 transport
119 .connect(&self.config.relays)
120 .await
121 .map_err(|e| MeshStoreError::Transport(e.to_string()))?;
122
123 let factory = Arc::new(WebRtcPeerLinkFactory::default());
124 let mut router = MeshRouter::new(
125 peer_id.clone(),
126 transport.clone(),
127 factory,
128 self.config.pools.clone(),
129 self.config.debug,
130 );
131 if let Some(classifier_tx) = self.config.classifier_tx.clone() {
132 router.set_classifier(classifier_tx);
133 }
134
135 let store = Arc::new(ProductionMeshStore::new_with_routing(
136 self.local_store.clone(),
137 Arc::new(router),
138 Duration::from_millis(self.config.request_timeout_ms),
139 self.config.debug,
140 self.routing_config(),
141 ));
142 if !self.config.upstream_blossom_servers.is_empty() {
143 let timeout = Duration::from_millis(self.config.request_timeout_ms);
144 let sources: Vec<Arc<dyn MeshReadSource>> = self
145 .config
146 .upstream_blossom_servers
147 .iter()
148 .map(|server| {
149 Arc::new(BlossomReadSource {
150 id: format!("blossom:{server}"),
151 client: BlossomClient::new_empty(keys.clone())
152 .with_timeout(timeout)
153 .with_read_servers(vec![server.clone()]),
154 }) as Arc<dyn MeshReadSource>
155 })
156 .collect();
157 store.set_read_sources(sources).await;
158 }
159 store
160 .start()
161 .await
162 .map_err(|e| MeshStoreError::Transport(e.to_string()))?;
163
164 self.running.store(true, Ordering::Relaxed);
165
166 let running = self.running.clone();
167 let signaling_store = store.clone();
168 let signaling_transport = transport.clone();
169 let signaling_task = tokio::spawn(async move {
170 loop {
171 if !running.load(Ordering::Relaxed) {
172 break;
173 }
174
175 let Some(msg) = signaling_transport.recv().await else {
176 break;
177 };
178 if signaling_store.process_signaling(msg).await.is_err() {
179 continue;
180 }
181 }
182 });
183
184 let running = self.running.clone();
185 let data_store = store.clone();
186 let data_task = tokio::spawn(async move {
187 let mut ticker = tokio::time::interval(Duration::from_millis(10));
188 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
189 loop {
190 ticker.tick().await;
191 if !running.load(Ordering::Relaxed) {
192 break;
193 }
194 let _ = data_store.drain_available_data_messages().await;
195 }
196 });
197
198 let running = self.running.clone();
199 let hello_store = store.clone();
200 let hello_interval_ms = self.config.hello_interval_ms;
201 let hello_task = tokio::spawn(async move {
202 let mut ticker = tokio::time::interval(Duration::from_millis(hello_interval_ms));
203 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
204 loop {
205 ticker.tick().await;
206 if !running.load(Ordering::Relaxed) {
207 break;
208 }
209 let _ = hello_store.send_hello().await;
210 }
211 });
212
213 *self.live.lock().await = Some(LiveMesh {
214 transport,
215 store,
216 tasks: vec![signaling_task, data_task, hello_task],
217 });
218
219 Ok(())
220 }
221
222 pub async fn stop(&self) {
224 self.running.store(false, Ordering::Relaxed);
225
226 let live = self.live.lock().await.take();
227 let Some(live) = live else {
228 return;
229 };
230
231 live.store.stop().await;
232
233 let peer_ids = live.store.signaling().peer_ids().await;
234 for peer_id in peer_ids {
235 if let Some(channel) = live.store.signaling().get_channel(&peer_id).await {
236 channel.close().await;
237 }
238 }
239
240 for task in live.tasks {
241 task.abort();
242 }
243 live.transport.disconnect().await;
244 }
245
246 pub async fn stats(&self) -> MeshStats {
248 MeshStats {
249 connected_peers: self.peer_count().await,
250 pending_requests: 0,
251 bytes_sent: 0,
252 bytes_received: self.stats_bytes_received.load(Ordering::Relaxed),
253 requests_made: self.stats_requests_made.load(Ordering::Relaxed),
254 requests_fulfilled: self.stats_requests_fulfilled.load(Ordering::Relaxed),
255 }
256 }
257
258 pub async fn peer_count(&self) -> usize {
260 let Some(store) = self.live_store().await else {
261 return 0;
262 };
263 store.peer_count().await
264 }
265
266 pub async fn selector_summary(&self) -> SelectorSummary {
268 let Some(store) = self.live_store().await else {
269 return SelectorSummary::default();
270 };
271 store.selector_summary().await
272 }
273}
274
275#[async_trait]
276impl<S: Store + Send + Sync + 'static> Store for MeshStore<S> {
277 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
278 self.local_store.put(hash, data).await
279 }
280
281 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
282 if let Some(data) = self.local_store.get(hash).await? {
283 return Ok(Some(data));
284 }
285
286 self.stats_requests_made.fetch_add(1, Ordering::Relaxed);
287
288 let Some(store) = self.live_store().await else {
289 return Ok(None);
290 };
291
292 let result = store.get(hash).await?;
293 if let Some(ref data) = result {
294 self.stats_requests_fulfilled
295 .fetch_add(1, Ordering::Relaxed);
296 self.stats_bytes_received
297 .fetch_add(data.len() as u64, Ordering::Relaxed);
298 }
299 Ok(result)
300 }
301
302 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
303 self.local_store.has(hash).await
304 }
305
306 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
307 self.local_store.delete(hash).await
308 }
309}