1use crate::types::{MeshStats, MeshStoreConfig, PeerId};
11use crate::{
12 MeshRouter, MeshRoutingConfig, NostrRelayTransport, ProductionMeshStore, SelectorSummary,
13 SignalingTransport, WebRtcPeerLinkFactory,
14};
15use async_trait::async_trait;
16use hashtree_core::{Hash, Store, StoreError};
17use nostr_sdk::prelude::*;
18use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
19use std::sync::Arc;
20use thiserror::Error;
21use tokio::sync::Mutex;
22use tokio::task::JoinHandle;
23use tokio::time::Duration;
24
25#[derive(Debug, Error)]
26pub enum MeshStoreError {
27 #[error("Mesh transport error: {0}")]
28 Transport(String),
29 #[error("No peers available")]
30 NoPeers,
31 #[error("Data not found")]
32 NotFound,
33 #[error("Store error: {0}")]
34 Store(#[from] StoreError),
35}
36
37struct LiveMesh<S: Store + Send + Sync + 'static> {
38 transport: Arc<NostrRelayTransport>,
39 store: Arc<ProductionMeshStore<S>>,
40 tasks: Vec<JoinHandle<()>>,
41}
42
43pub struct MeshStore<S: Store + Send + Sync + 'static> {
45 local_store: Arc<S>,
46 config: MeshStoreConfig,
47 live: Mutex<Option<LiveMesh<S>>>,
48 running: Arc<AtomicBool>,
49 stats_requests_made: AtomicU64,
50 stats_requests_fulfilled: AtomicU64,
51 stats_bytes_received: AtomicU64,
52}
53
54impl<S: Store + Send + Sync + 'static> MeshStore<S> {
55 pub fn new(local_store: Arc<S>, config: MeshStoreConfig) -> Self {
57 Self {
58 local_store,
59 config,
60 live: Mutex::new(None),
61 running: Arc::new(AtomicBool::new(false)),
62 stats_requests_made: AtomicU64::new(0),
63 stats_requests_fulfilled: AtomicU64::new(0),
64 stats_bytes_received: AtomicU64::new(0),
65 }
66 }
67
68 fn routing_config(&self) -> MeshRoutingConfig {
69 MeshRoutingConfig {
70 selection_strategy: self.config.request_selection_strategy,
71 fairness_enabled: self.config.request_fairness_enabled,
72 cashu_payment_weight: 0.0,
73 cashu_payment_default_block_threshold: 0,
74 cashu_accepted_mints: Vec::new(),
75 cashu_default_mint: None,
76 cashu_peer_suggested_mint_base_cap_sat: 0,
77 cashu_peer_suggested_mint_success_step_sat: 0,
78 cashu_peer_suggested_mint_receipt_step_sat: 0,
79 cashu_peer_suggested_mint_max_cap_sat: 0,
80 dispatch: self.config.request_dispatch,
81 response_behavior: Default::default(),
82 }
83 }
84
85 async fn live_store(&self) -> Option<Arc<ProductionMeshStore<S>>> {
86 self.live
87 .lock()
88 .await
89 .as_ref()
90 .map(|live| live.store.clone())
91 }
92
93 pub async fn start(&mut self, keys: Keys) -> Result<(), MeshStoreError> {
95 if self.running.load(Ordering::Relaxed) {
96 return Ok(());
97 }
98
99 let peer_id = PeerId::new(keys.public_key().to_hex()).to_string();
100 let transport = Arc::new(NostrRelayTransport::new(keys, self.config.debug));
101 transport
102 .connect(&self.config.relays)
103 .await
104 .map_err(|e| MeshStoreError::Transport(e.to_string()))?;
105
106 let factory = Arc::new(WebRtcPeerLinkFactory::default());
107 let mut router = MeshRouter::new(
108 peer_id.clone(),
109 transport.clone(),
110 factory,
111 self.config.pools.clone(),
112 self.config.debug,
113 );
114 if let Some(classifier_tx) = self.config.classifier_tx.clone() {
115 router.set_classifier(classifier_tx);
116 }
117
118 let store = Arc::new(ProductionMeshStore::new_with_routing(
119 self.local_store.clone(),
120 Arc::new(router),
121 Duration::from_millis(self.config.request_timeout_ms),
122 self.config.debug,
123 self.routing_config(),
124 ));
125 store
126 .start()
127 .await
128 .map_err(|e| MeshStoreError::Transport(e.to_string()))?;
129
130 self.running.store(true, Ordering::Relaxed);
131
132 let running = self.running.clone();
133 let signaling_store = store.clone();
134 let signaling_transport = transport.clone();
135 let signaling_task = tokio::spawn(async move {
136 loop {
137 if !running.load(Ordering::Relaxed) {
138 break;
139 }
140
141 let Some(msg) = signaling_transport.recv().await else {
142 break;
143 };
144 if signaling_store.process_signaling(msg).await.is_err() {
145 continue;
146 }
147 }
148 });
149
150 let running = self.running.clone();
151 let data_store = store.clone();
152 let data_task = tokio::spawn(async move {
153 let mut ticker = tokio::time::interval(Duration::from_millis(10));
154 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
155 loop {
156 ticker.tick().await;
157 if !running.load(Ordering::Relaxed) {
158 break;
159 }
160 let _ = data_store.drain_available_data_messages().await;
161 }
162 });
163
164 let running = self.running.clone();
165 let hello_store = store.clone();
166 let hello_interval_ms = self.config.hello_interval_ms;
167 let hello_task = tokio::spawn(async move {
168 let mut ticker = tokio::time::interval(Duration::from_millis(hello_interval_ms));
169 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
170 loop {
171 ticker.tick().await;
172 if !running.load(Ordering::Relaxed) {
173 break;
174 }
175 let _ = hello_store.send_hello().await;
176 }
177 });
178
179 *self.live.lock().await = Some(LiveMesh {
180 transport,
181 store,
182 tasks: vec![signaling_task, data_task, hello_task],
183 });
184
185 Ok(())
186 }
187
188 pub async fn stop(&self) {
190 self.running.store(false, Ordering::Relaxed);
191
192 let live = self.live.lock().await.take();
193 let Some(live) = live else {
194 return;
195 };
196
197 live.store.stop().await;
198
199 let peer_ids = live.store.signaling().peer_ids().await;
200 for peer_id in peer_ids {
201 if let Some(channel) = live.store.signaling().get_channel(&peer_id).await {
202 channel.close().await;
203 }
204 }
205
206 for task in live.tasks {
207 task.abort();
208 }
209 live.transport.disconnect().await;
210 }
211
212 pub async fn stats(&self) -> MeshStats {
214 MeshStats {
215 connected_peers: self.peer_count().await,
216 pending_requests: 0,
217 bytes_sent: 0,
218 bytes_received: self.stats_bytes_received.load(Ordering::Relaxed),
219 requests_made: self.stats_requests_made.load(Ordering::Relaxed),
220 requests_fulfilled: self.stats_requests_fulfilled.load(Ordering::Relaxed),
221 }
222 }
223
224 pub async fn peer_count(&self) -> usize {
226 let Some(store) = self.live_store().await else {
227 return 0;
228 };
229 store.peer_count().await
230 }
231
232 pub async fn selector_summary(&self) -> SelectorSummary {
234 let Some(store) = self.live_store().await else {
235 return SelectorSummary::default();
236 };
237 store.selector_summary().await
238 }
239}
240
241#[async_trait]
242impl<S: Store + Send + Sync + 'static> Store for MeshStore<S> {
243 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
244 self.local_store.put(hash, data).await
245 }
246
247 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
248 if let Some(data) = self.local_store.get(hash).await? {
249 return Ok(Some(data));
250 }
251
252 self.stats_requests_made.fetch_add(1, Ordering::Relaxed);
253
254 let Some(store) = self.live_store().await else {
255 return Ok(None);
256 };
257
258 let result = store.get(hash).await?;
259 if let Some(ref data) = result {
260 self.stats_requests_fulfilled
261 .fetch_add(1, Ordering::Relaxed);
262 self.stats_bytes_received
263 .fetch_add(data.len() as u64, Ordering::Relaxed);
264 }
265 Ok(result)
266 }
267
268 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
269 self.local_store.has(hash).await
270 }
271
272 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
273 self.local_store.delete(hash).await
274 }
275}