1use crate::environment::environment::PostBundleMetadata;
24use crate::server::hashiverse_server::HashiverseServer;
25use crate::tools::tools::is_ssrf_protected_ip;
26use bytes::{Bytes, BytesMut};
27use hashiverse_lib::anyhow_assert_eq;
28use hashiverse_lib::protocol::payload::payload::{
29 AnnounceResponseV1, AnnounceV1, BootstrapResponseV1, CachePostBundleFeedbackResponseV1, CachePostBundleFeedbackV1, CachePostBundleResponseV1, CachePostBundleV1, ErrorResponseV1, FetchUrlPreviewResponseV1, FetchUrlPreviewV1,
30 GetPostBundleFeedbackResponseV1, GetPostBundleFeedbackV1, GetPostBundleResponseV1, GetPostBundleV1, HealPostBundleClaimResponseV1, HealPostBundleClaimTokenV1, HealPostBundleClaimV1, HealPostBundleCommitResponseV1, HealPostBundleCommitV1,
31 HealPostBundleFeedbackResponseV1, HealPostBundleFeedbackV1, PayloadRequestKind, PayloadResponseKind, PingResponseV1, PeerStatsRequestV1, PeerStatsResponseV1, SubmitPostClaimResponseV1, SubmitPostClaimTokenV1, SubmitPostClaimV1,
32 SubmitPostCommitResponseV1, SubmitPostCommitTokenV1, SubmitPostCommitV1, SubmitPostFeedbackResponseV1, SubmitPostFeedbackV1, TrendingHashtagV1, TrendingHashtagsFetchResponseV1, TrendingHashtagsFetchV1,
33};
34use hashiverse_lib::protocol::peer::PeerPow;
35use hashiverse_lib::protocol::posting::amplification::get_minimum_post_pow;
36use hashiverse_lib::protocol::posting::encoded_post::EncodedPostV1;
37use hashiverse_lib::protocol::posting::encoded_post_bundle::{EncodedPostBundleHeaderV1, EncodedPostBundleV1};
38use hashiverse_lib::protocol::posting::encoded_post_bundle_feedback::{EncodedPostBundleFeedbackHeaderV1, EncodedPostBundleFeedbackV1};
39use hashiverse_lib::protocol::rpc::rpc_request::RpcRequestPacketRx;
40use hashiverse_lib::protocol::rpc::rpc_response::{RpcResponsePacketTx, RpcResponsePacketTxFlags};
41use hashiverse_lib::tools::buckets::{BucketLocation, BucketType, BUCKET_DURATIONS};
42use hashiverse_lib::tools::time::{TimeMillis, MILLIS_IN_SECOND};
43use hashiverse_lib::tools::types::{Id, Signature};
44use hashiverse_lib::tools::{hashing, url_preview};
45use hashiverse_lib::tools::{compression, config, json, signing, BytesGatherer};
46use hashiverse_lib::transport::transport::IncomingRequest;
47use log::{info, trace, warn};
48use std::collections::HashSet;
49use std::sync::atomic::Ordering;
50
51use crate::server::stats::{environment_stats_subtree, kademlia_stats_subtree, request_counts_subtree, system_stats_subtree};
52use tokio::sync::mpsc;
53use tokio_util::sync::CancellationToken;
54
55const TRENDING_HASHTAGS_FALLBACK: &[&str] = &["hashiverse", "news"];
61
62fn normalise_hashtag(hashtag: &str) -> String {
65 let lowercased = hashtag.to_lowercase();
66 match lowercased.strip_prefix('#') {
67 Some(stripped) => stripped.to_string(),
68 None => lowercased,
69 }
70}
71
72fn top_up_trending_hashtags_with_fallback(trending_hashtags: &mut Vec<TrendingHashtagV1>, limit: u16, fallback_hashtags: &[&str]) {
77 let target_length = limit as usize;
78 if trending_hashtags.len() >= target_length {
79 return;
80 }
81
82 let mut existing_normalised_hashtags: HashSet<String> = trending_hashtags.iter()
83 .map(|entry| normalise_hashtag(&entry.hashtag))
84 .collect();
85
86 for fallback_hashtag in fallback_hashtags {
87 if trending_hashtags.len() >= target_length {
88 break;
89 }
90 let normalised_fallback_hashtag = normalise_hashtag(fallback_hashtag);
91 if existing_normalised_hashtags.contains(&normalised_fallback_hashtag) {
92 continue;
93 }
94 trending_hashtags.push(TrendingHashtagV1 {
95 hashtag: (*fallback_hashtag).to_string(),
96 count: 0,
97 });
98 existing_normalised_hashtags.insert(normalised_fallback_hashtag);
99 }
100}
101
102impl HashiverseServer {
103 pub async fn wrap_and_dispatch_network_envelopes(&self, cancellation_token: CancellationToken, mut rx: mpsc::Receiver<IncomingRequest>) -> Result<(), anyhow::Error> {
104 loop {
105 tokio::select! {
106 _ = cancellation_token.cancelled() => { break },
107
108 receipt = rx.recv() => {
109 match receipt {
110 Some(incoming) => {
111 let result = self.wrap_and_dispatch_network_envelope(cancellation_token.clone(), &incoming).await;
113 match result {
114 Ok(bytes) => {
115 let result = incoming.reply.send(bytes);
116 if result.is_err() { warn!("failed to send reply"); }
117 },
118 Err(e) => {
119 warn!("failed to process packet from {}: {}", incoming.caller_address, e);
120 incoming.report_bad_request();
121 drop(incoming.reply);
122 },
123 }
124 },
125 None => {
126 warn!("channel closed");
127 break;
128 }
129 }
130 }
131 }
132 }
133
134 Ok(())
135 }
136
137 async fn wrap_and_dispatch_network_envelope(&self, cancellation_token: CancellationToken, incoming: &IncomingRequest) -> anyhow::Result<BytesGatherer> {
138 let caller_address = incoming.caller_address.as_str();
139 let current_time_millis = self.runtime_services.time_provider.current_time_millis();
140
141 let rpc_request_packet_rx = RpcRequestPacketRx::decode(¤t_time_millis, &self.server_id.keys.verification_key_bytes, &self.server_id.keys.pq_commitment_bytes, incoming.bytes.clone())?;
143 self.request_counters[rpc_request_packet_rx.payload_request_kind.clone() as usize].fetch_add(1, Ordering::Relaxed);
149
150 {
152 if self.seen_salts.contains_key(&rpc_request_packet_rx.pow_salt) {
153 anyhow::bail!("replay detected: salt already seen");
154 }
155 self.seen_salts.insert(rpc_request_packet_rx.pow_salt, ());
156 }
157
158 let pow_content_hash = rpc_request_packet_rx.pow_content_hash;
160
161 let dispatch_result: anyhow::Result<BytesGatherer> = try {
162 let pow = match rpc_request_packet_rx.pow_server_known {
164 true => {
165 let (pow, improved_pow_current_day, improved_pow_current_month) = {
166 let peer_self = self.peer_self.read(); let pow = PeerPow::new(
168 rpc_request_packet_rx.pow_sponsor_id,
169 &peer_self.verification_key_bytes,
170 &peer_self.pq_commitment_bytes,
171 rpc_request_packet_rx.pow_timestamp,
172 rpc_request_packet_rx.pow_content_hash,
173 rpc_request_packet_rx.pow_salt,
174 )?;
175
176 let improved_pow_current_day = pow.pow_decayed_day(current_time_millis) > peer_self.pow_current_day.pow_decayed_day(current_time_millis);
177 let improved_pow_current_month = pow.pow_decayed_month(current_time_millis) > peer_self.pow_current_month.pow_decayed_month(current_time_millis);
178
179 (pow, improved_pow_current_day, improved_pow_current_month)
180 };
181
182 if improved_pow_current_day || improved_pow_current_month {
184 let mut peer_self = self.peer_self.write(); if improved_pow_current_day {
186 trace!("pow_current_day upgraded {} -> {}", peer_self.pow_current_day, pow);
187 peer_self.pow_current_day = pow.clone();
188 }
189 if improved_pow_current_month {
190 trace!("pow_current_month upgraded {} -> {}", peer_self.pow_current_month, pow);
191 peer_self.pow_current_month = pow.clone();
192 }
193
194 peer_self.sign(self.runtime_services.time_provider.as_ref(), &self.server_id.keys.signature_key)?;
195 }
196
197 Some(pow)
198 }
199
200 false => {
201 match rpc_request_packet_rx.payload_request_kind {
203 PayloadRequestKind::BootstrapV1 => {}
204 _ => anyhow::bail!("Anonymous pow not allowed for {}", rpc_request_packet_rx.payload_request_kind),
205 }
206
207 None
208 }
209 };
210
211 let (compress_response, payload_response_kind, payload) = self.dispatch_network_envelope(cancellation_token, pow, rpc_request_packet_rx).await?;
213 let response_flags = match compress_response {
214 true => RpcResponsePacketTxFlags::COMPRESSED,
215 false => RpcResponsePacketTxFlags::empty(),
216 };
217
218 RpcResponsePacketTx::encode(
220 &self.server_id.keys.signature_key,
221 &self.server_id.keys.verification_key_bytes,
222 &self.server_id.keys.pq_commitment_bytes,
223 &self.server_id.sponsor_id,
224 &self.server_id.timestamp,
225 &self.server_id.hash,
226 &self.server_id.salt,
227 &pow_content_hash,
228 response_flags,
229 payload_response_kind,
230 payload,
231 )?
232 };
233
234 match dispatch_result {
235 Ok(results) => Ok(results),
236 Err(e) => {
237 warn!("failed to dispatch packet from {}: {}", caller_address, e);
238 incoming.report_bad_request();
239
240 let payload_response_kind = PayloadResponseKind::ErrorResponseV1;
241 let response = ErrorResponseV1 { code: 0, message: e.to_string() };
242 let payload = BytesGatherer::from_bytes(json::struct_to_bytes(&response)?);
243
244 RpcResponsePacketTx::encode(
246 &self.server_id.keys.signature_key,
247 &self.server_id.keys.verification_key_bytes,
248 &self.server_id.keys.pq_commitment_bytes,
249 &self.server_id.sponsor_id,
250 &self.server_id.timestamp,
251 &self.server_id.hash,
252 &self.server_id.salt,
253 &pow_content_hash,
254 RpcResponsePacketTxFlags::COMPRESSED,
255 payload_response_kind,
256 payload,
257 )
258 }
259 }
260 }
261
262 async fn dispatch_network_envelope(&self, cancellation_token: CancellationToken, pow: Option<PeerPow>, rpc_request_packet_rx: RpcRequestPacketRx) -> anyhow::Result<(bool, PayloadResponseKind, BytesGatherer)> {
263 let compress_response = match rpc_request_packet_rx.payload_request_kind {
265 PayloadRequestKind::GetPostBundleV1 => false, PayloadRequestKind::CachePostBundleV1 => false, _ => true,
268 };
269
270 let (payload_response_kind, payload) = match rpc_request_packet_rx.payload_request_kind {
271 PayloadRequestKind::ErrorV1 => {
272 anyhow::bail!("Received ErrorV1");
273 }
274 PayloadRequestKind::PingV1 => self.dispatch_network_payload_x_PingV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
275 PayloadRequestKind::BootstrapV1 => self.dispatch_network_payload_x_BootstrapV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
276 PayloadRequestKind::AnnounceV1 => self.dispatch_network_payload_x_AnnounceV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
277 PayloadRequestKind::GetPostBundleV1 => self.dispatch_network_payload_x_GetPostBundleV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
278 PayloadRequestKind::GetPostBundleFeedbackV1 => { self.dispatch_network_payload_x_GetPostBundleFeedbackV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
279 PayloadRequestKind::SubmitPostClaimV1 => { self.dispatch_network_payload_x_SubmitPostClaimV1(cancellation_token, pow, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
280 PayloadRequestKind::SubmitPostCommitV1 => { self.dispatch_network_payload_x_SubmitPostCommitV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
281 PayloadRequestKind::SubmitPostFeedbackV1 => { self.dispatch_network_payload_x_SubmitPostFeedbackV1(cancellation_token, pow, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
282 PayloadRequestKind::HealPostBundleClaimV1 => { self.dispatch_network_payload_x_HealPostBundleClaimV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
283 PayloadRequestKind::HealPostBundleCommitV1 => { self.dispatch_network_payload_x_HealPostBundleCommitV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
284 PayloadRequestKind::HealPostBundleFeedbackV1 => { self.dispatch_network_payload_x_HealPostBundleFeedbackV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
285 PayloadRequestKind::CachePostBundleV1 => self.dispatch_network_payload_x_CachePostBundleV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
286 PayloadRequestKind::CachePostBundleFeedbackV1 => { self.dispatch_network_payload_x_CachePostBundleFeedbackV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
287 PayloadRequestKind::FetchUrlPreviewV1 => self.dispatch_network_payload_x_FetchUrlPreviewV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
288 PayloadRequestKind::TrendingHashtagsFetchV1 => self.dispatch_network_payload_x_TrendingHashtagsFetchV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
289 PayloadRequestKind::PeerStatsRequestV1 => self.dispatch_network_payload_x_PeerStatsRequestV1(cancellation_token, pow, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
290 };
291
292 Ok((compress_response, payload_response_kind, payload))
293 }
294
295 #[allow(non_snake_case)]
296 async fn dispatch_network_payload_x_PingV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, _bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
297 anyhow_assert_eq!(&PayloadRequestKind::PingV1, &payload_request_kind);
298 let peer = self.peer_self.read().clone();
299 let json = json::struct_to_bytes(&PingResponseV1 { peer })?;
300 Ok((PayloadResponseKind::PingResponseV1, BytesGatherer::from_bytes(json)))
301 }
302
303 #[allow(non_snake_case)]
304 async fn dispatch_network_payload_x_BootstrapV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, _bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
305 anyhow_assert_eq!(&PayloadRequestKind::BootstrapV1, &payload_request_kind);
306 let peers_random = self.kademlia.read().get_peers_random(config::BOOTSTRAP_V1_NUM_PEERS);
307 let json = json::struct_to_bytes(&BootstrapResponseV1 { peers_random })?;
308 Ok((PayloadResponseKind::BootstrapResponseV1, BytesGatherer::from_bytes(json)))
309 }
310
311 #[allow(non_snake_case)]
312 async fn dispatch_network_payload_x_AnnounceV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
313 anyhow_assert_eq!(&PayloadRequestKind::AnnounceV1, &payload_request_kind);
314
315 let request = json::bytes_to_struct::<AnnounceV1>(&bytes)?;
316 let peer = request.peer_self;
319 let peer_id = peer.id;
320
321 self.add_potential_peer_to_kademlia(peer, self.runtime_services.time_provider.as_ref().current_time_millis()).await;
323
324 let (peers_nearest, _) = self.kademlia.read().get_peers_for_key(&peer_id, config::ANNOUNCE_V1_NUM_PEERS);
325
326 let json = json::struct_to_bytes(&AnnounceResponseV1 {
327 peer_self: self.peer_self.read().clone(),
328 peers_nearest,
329 })?;
330 Ok((PayloadResponseKind::AnnounceResponseV1, BytesGatherer::from_bytes(json)))
331 }
332
333 #[allow(non_snake_case)]
334 async fn dispatch_network_payload_x_GetPostBundleV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
335 anyhow_assert_eq!(&PayloadRequestKind::GetPostBundleV1, &payload_request_kind);
336
337 let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
338
339 let request = json::bytes_to_struct::<GetPostBundleV1>(&bytes)?;
340 trace!("received GetPostBundleV1: bucket_location={}", request.bucket_location);
341
342 request.bucket_location.validate()?;
344
345 {
347 for peer in request.peers_visited {
348 self.add_potential_peer_to_kademlia(peer, time_millis).await;
349 }
350 }
351
352 let peer_self = self.peer_self.read().clone();
353
354 let (peers_nearer, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&request.bucket_location.location_id, config::REDUNDANT_SERVERS_PER_POST);
356 if !among_peers_nearer {
357 warn!("I am not in peers_nearer {}", peer_self);
358 }
359
360 let post_bundle = match among_peers_nearer {
361 true => {
362 let _post_bundle_lock = self.environment.get_write_lock_for_location_id(&request.bucket_location.location_id);
365
366 let mut encoded_post_bundle_bytes: Option<Bytes> = None;
367
368 let post_bundle_metadata = self.environment.get_post_bundle_metadata(time_millis, &request.bucket_location.location_id)?;
370 if let Some(mut post_bundle_metadata) = post_bundle_metadata {
371 encoded_post_bundle_bytes = self.environment.get_post_bundle_bytes(time_millis, &request.bucket_location.location_id)?;
372
373 if !post_bundle_metadata.sealed {
375 let sealed = time_millis > request.bucket_location.bucket_time_millis + request.bucket_location.duration + config::ENCODED_POST_BUNDLE_V1_ELAPSED_THRESHOLD_MILLIS;
376 if sealed {
377 if let Some(encoded_post_bundle_bytes_old) = encoded_post_bundle_bytes {
379 let mut encoded_post_bundle = EncodedPostBundleV1::from_bytes(encoded_post_bundle_bytes_old, true)?;
380 encoded_post_bundle.header.time_millis = time_millis;
381 encoded_post_bundle.header.sealed = true;
382 encoded_post_bundle.header.signature_generate(&self.server_id.keys.signature_key)?;
383 let encoded_post_bundle_bytes_new = encoded_post_bundle.to_bytes()?;
384 self.environment.put_post_bundle_bytes(time_millis, &request.bucket_location.location_id, &encoded_post_bundle_bytes_new)?;
385 encoded_post_bundle_bytes = Some(encoded_post_bundle_bytes_new);
386 }
387
388 post_bundle_metadata.sealed = true;
390 self.environment.put_post_bundle_metadata(time_millis, &request.bucket_location.location_id, &post_bundle_metadata, 0)?;
391 }
392 else {
393 if let Some(encoded_post_bundle_bytes_old) = encoded_post_bundle_bytes {
395 let mut encoded_post_bundle = EncodedPostBundleV1::from_bytes(encoded_post_bundle_bytes_old, true)?;
396 encoded_post_bundle.header.time_millis = time_millis;
397 encoded_post_bundle.header.signature_generate(&self.server_id.keys.signature_key)?;
398 let encoded_post_bundle_bytes_new = encoded_post_bundle.to_bytes()?;
399 encoded_post_bundle_bytes = Some(encoded_post_bundle_bytes_new);
400 }
401 }
402 }
403 };
404
405 if encoded_post_bundle_bytes.is_none() {
408 let sealed = time_millis > request.bucket_location.bucket_time_millis + request.bucket_location.duration + config::ENCODED_POST_BUNDLE_V1_ELAPSED_THRESHOLD_MILLIS;
409
410 let mut header = EncodedPostBundleHeaderV1 {
411 time_millis,
412 location_id: request.bucket_location.location_id,
413 overflowed: false,
414 sealed,
415 num_posts: 0,
416 encoded_post_ids: vec![],
417 encoded_post_lengths: vec![],
418 encoded_post_healed: HashSet::new(),
419 peer: peer_self.clone(),
420 signature: Signature::zero(),
421 };
422 header.signature_generate(&self.server_id.keys.signature_key)?;
423
424 let encoded_post_bundle = EncodedPostBundleV1 { header, encoded_posts_bytes: Bytes::new() };
425 encoded_post_bundle_bytes = Some(encoded_post_bundle.to_bytes()?);
426 }
427
428 encoded_post_bundle_bytes
429 }
430 false => None,
431 };
432
433 let cache_result = self.post_bundle_cache.on_get(&request.bucket_location, &request.already_retrieved_peer_ids, &peer_self, &self.server_id, time_millis);
434
435 let get_post_bundle_response = GetPostBundleResponseV1 {
436 peers_nearer,
437 cache_request_token: cache_result.cache_request_token,
438 post_bundles_cached: cache_result.cached_items,
439 post_bundle,
440 };
441 Ok((PayloadResponseKind::GetPostBundleResponseV1, get_post_bundle_response.to_bytes_gatherer()?))
442 }
443
444 #[allow(non_snake_case)]
445 async fn dispatch_network_payload_x_GetPostBundleFeedbackV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
446 anyhow_assert_eq!(&PayloadRequestKind::GetPostBundleFeedbackV1, &payload_request_kind);
447
448 let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
449
450 let request = json::bytes_to_struct::<GetPostBundleFeedbackV1>(&bytes)?;
451 trace!("received GetPostBundleFeedbackV1");
452
453 {
455 for peer in request.peers_visited {
456 self.add_potential_peer_to_kademlia(peer, time_millis).await;
457 }
458 }
459
460 let (peers_nearer, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&request.bucket_location.location_id, config::REDUNDANT_SERVERS_PER_POST);
462
463 let mut post_bundle_encoded_feedbacks_bytes: Option<Bytes> = None;
464
465 if among_peers_nearer {
466 let post_bundle_metadata = self.environment.get_post_bundle_metadata(time_millis, &request.bucket_location.location_id)?;
468 if post_bundle_metadata.is_some() {
469 post_bundle_encoded_feedbacks_bytes = Some(self.environment.get_post_bundle_encoded_post_feedbacks_bytes(time_millis, &request.bucket_location.location_id)?);
470 }
471 }
472
473 let peer_self = self.peer_self.read().clone();
475 let encoded_post_bundle_feedback = match post_bundle_encoded_feedbacks_bytes {
476 Some(feedbacks_bytes) => {
477
478 let feedbacks_bytes_hash = hashing::hash(feedbacks_bytes.as_ref());
479
480 let mut header = EncodedPostBundleFeedbackHeaderV1 {
481 time_millis,
482 location_id: request.bucket_location.location_id,
483 feedbacks_bytes_hash,
484 peer: peer_self.clone(),
485 signature: Signature::zero(),
486 };
487 header.signature_generate(&self.server_id.keys.signature_key);
488
489 let encoded_post_bundle_feedback = EncodedPostBundleFeedbackV1 {
490 header,
491 feedbacks_bytes,
492 };
493 Some(encoded_post_bundle_feedback.to_bytes()?)
494 }
495 None => None,
496 };
497
498 let cache_result = self.post_bundle_feedback_cache.on_get(&request.bucket_location, &request.already_retrieved_peer_ids, &peer_self, &self.server_id, time_millis);
499
500 let get_post_bundle_feedback_response = GetPostBundleFeedbackResponseV1 {
501 peers_nearer,
502 cache_request_token: cache_result.cache_request_token,
503 post_bundle_feedbacks_cached: cache_result.cached_items,
504 encoded_post_bundle_feedback,
505 };
506 Ok((PayloadResponseKind::GetPostBundleFeedbackResponseV1, get_post_bundle_feedback_response.to_bytes_gatherer()?))
507 }
508
509 #[allow(non_snake_case)]
510 async fn dispatch_network_payload_x_SubmitPostClaimV1(&self, _cancellation_token: CancellationToken, pow: Option<PeerPow>, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
511 anyhow_assert_eq!(&PayloadRequestKind::SubmitPostClaimV1, &payload_request_kind);
512
513 let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
514
515 let pow = match pow {
516 Some(pow) => pow,
517 None => anyhow::bail!("We need pow for a submit post claim"),
518 };
519
520 let request = SubmitPostClaimV1::from_bytes(&mut bytes)?;
521 trace!("received SubmitPostClaimV1");
522
523 request.bucket_location.validate()?;
525
526 let bucket_duration = {
528 let bucket_duration = BUCKET_DURATIONS.iter().find(|bucket_duration| **bucket_duration == request.bucket_location.duration);
529 match bucket_duration {
530 Some(bucket_duration) => *bucket_duration,
531 None => anyhow::bail!("Unrecognised bucket duration provided"),
532 }
533 };
534
535 let decoded_post = EncodedPostV1::decode_from_bytes(request.encoded_post_bytes, &request.bucket_location.base_id, false, false)?;
536
537 {
539 let pow_minimum = get_minimum_post_pow(decoded_post.header.post_length, decoded_post.header.linked_base_ids.len(), request.bucket_location.duration);
540 if pow.pow < pow_minimum {
541 anyhow::bail!("Insufficient proof of work for this post: actual={} < expected={}", pow.pow, pow_minimum);
542 }
543 }
544
545 {
547 let timestamp = BucketLocation::round_down_to_bucket_start(decoded_post.header.time_millis, bucket_duration);
549 if timestamp != request.bucket_location.bucket_time_millis {
550 anyhow::bail!("The post timestamp does not match the bucket");
551 }
552 }
553
554 let client_id = decoded_post.header.client_id()?;
555
556 if !decoded_post.header.linked_base_ids.contains(&request.bucket_location.base_id) {
558 anyhow::bail!("The base_id is not related to the post");
559 }
560
561 if request.bucket_location.bucket_type == BucketType::User && request.bucket_location.base_id != client_id.id {
563 anyhow::bail!("Only the posting user is allowed to post to a bucket of type USER");
564 }
565
566 if matches!(request.bucket_location.bucket_type, BucketType::ReplyToPost | BucketType::Sequel) {
569 let original_header_bytes = request.referenced_post_header_bytes
570 .ok_or_else(|| anyhow::anyhow!("{:?} posts require the original post's header bytes", request.bucket_location.bucket_type))?;
571
572 let original_post = EncodedPostV1::decode_from_bytes(original_header_bytes, &client_id.id, false, false)?;
575
576 if original_post.post_id != request.bucket_location.base_id {
578 anyhow::bail!("Referenced post header's post_id does not match the bucket's base_id");
579 }
580
581 if request.bucket_location.bucket_type == BucketType::Sequel {
583 let original_client_id = original_post.header.client_id()?;
584 if original_client_id != client_id {
585 anyhow::bail!("Sequel post author does not match original post author");
586 }
587 }
588 }
589
590 {
592 let delta = (time_millis - decoded_post.header.time_millis).abs();
593 if delta > config::CLIENT_POST_TIMESTAMP_DELTA_THRESHOLD {
594 anyhow::bail!("The post timestamp delta is too large ({} > {})", delta, config::CLIENT_POST_TIMESTAMP_DELTA_THRESHOLD);
595 }
596 }
597
598 let (peers_nearer, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&request.bucket_location.location_id, config::REDUNDANT_SERVERS_PER_POST);
600
601 let submit_post_claim_token = match among_peers_nearer {
602 true => {
603 let _post_bundle_lock = self.environment.get_write_lock_for_location_id(&request.bucket_location.location_id);
605 let post_bundle_metadata = self.environment.get_post_bundle_metadata(time_millis, &request.bucket_location.location_id)?;
606 let mut post_bundle_metadata = post_bundle_metadata.unwrap_or_else(PostBundleMetadata::zero);
607
608 if !post_bundle_metadata.sealed {
610 post_bundle_metadata.num_posts_granted += 1;
611 post_bundle_metadata.overflowed = post_bundle_metadata.num_posts > config::ENCODED_POST_BUNDLE_V1_OVERFLOWED_NUM_POSTS || post_bundle_metadata.num_posts_granted > config::ENCODED_POST_BUNDLE_V1_OVERFLOWED_NUM_POSTS_GRANTED;
612 post_bundle_metadata.sealed = post_bundle_metadata.overflowed || time_millis > request.bucket_location.bucket_time_millis + request.bucket_location.duration + config::ENCODED_POST_BUNDLE_V1_ELAPSED_THRESHOLD_MILLIS;
613
614 self.environment.put_post_bundle_metadata(time_millis, &request.bucket_location.location_id, &post_bundle_metadata, 0)?;
615 }
616
617 match post_bundle_metadata.sealed {
619 false => {
620 info!("Granted for {}: num_posts={} num_posts_granted={}", request.bucket_location, post_bundle_metadata.num_posts, post_bundle_metadata.num_posts_granted);
621 Some(SubmitPostClaimTokenV1::new(self.peer_self.read().clone(), request.bucket_location.clone(), decoded_post.post_id, &self.server_id.keys.signature_key))
622 }
623 true => {
624 info!(
625 "Not granting SubmitPostClaimTokenV1 to {} as we have num_posts={} num_posts_granted={}",
626 request.bucket_location, post_bundle_metadata.num_posts, post_bundle_metadata.num_posts_granted
627 );
628 None
629 }
630 }
631 }
632
633 false => None,
634 };
635
636 if submit_post_claim_token.is_some() {
638 if request.bucket_location.bucket_type == BucketType::User && !request.referenced_hashtags.is_empty() {
640 let author_verification_key_bytes = &decoded_post.header.verification_key_bytes;
641 for referenced_hashtag in &request.referenced_hashtags {
642 let hashtag_id = match Id::from_hashtag_str(referenced_hashtag) {
643 Ok(id) => id,
644 Err(_) => continue, };
646 if !decoded_post.header.linked_base_ids.contains(&hashtag_id) {
647 continue; }
649 let mut hll = self.trending_hashtags.get(referenced_hashtag).unwrap_or_default();
650 hll.insert(author_verification_key_bytes.as_ref());
651 self.trending_hashtags.insert(referenced_hashtag.clone(), hll);
652 }
653 }
654 }
655
656 let response = SubmitPostClaimResponseV1 { peers_nearer, submit_post_claim_token };
657 Ok((PayloadResponseKind::SubmitPostClaimResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
658 }
659
660 #[allow(non_snake_case)]
661 async fn dispatch_network_payload_x_SubmitPostCommitV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
662 anyhow_assert_eq!(&PayloadRequestKind::SubmitPostCommitV1, &payload_request_kind);
663
664 let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
665
666 let request = SubmitPostCommitV1::from_bytes(&mut bytes)?;
667 trace!("received SubmitPostCommitV1");
668
669 let peer_self = self.peer_self.read(); if request.submit_post_claim_token.peer.id != peer_self.id {
673 anyhow::bail!("The submit_post_claim_token is not from us");
674 }
675
676 request.bucket_location.validate()?;
678 if request.bucket_location != request.submit_post_claim_token.bucket_location {
679 anyhow::bail!("The location_id in the SubmitPostCommit does not match the bucket_location in the SubmitPostClaimToken");
680 }
681
682 let decoded_post = EncodedPostV1::decode_from_bytes(request.encoded_post_bytes.clone(), &request.bucket_location.base_id, true, false)?;
684
685 if decoded_post.post_id != request.submit_post_claim_token.post_id {
687 anyhow::bail!("The post_id of the committed post does not match the post_id in the SubmitPostClaimToken");
688 }
689
690 let _post_bundle_lock = self.environment.get_write_lock_for_location_id(&request.bucket_location.location_id);
692 let post_bundle_metadata = self.environment.get_post_bundle_metadata(time_millis, &request.bucket_location.location_id)?;
693 let post_bundle_bytes = self.environment.get_post_bundle_bytes(time_millis, &request.bucket_location.location_id)?;
694
695 let mut post_bundle_metadata = post_bundle_metadata.unwrap_or_else(PostBundleMetadata::zero);
697
698 let mut post_bundle = match post_bundle_bytes {
700 Some(bytes) => {
701 let bytes = Bytes::from_owner(bytes);
702
703 EncodedPostBundleV1::from_bytes(bytes, true)?
704 }
705 None => {
706 let header = EncodedPostBundleHeaderV1 {
707 time_millis: TimeMillis::zero(),
708 location_id: request.bucket_location.location_id,
709 overflowed: false,
710 sealed: false,
711 num_posts: 0,
712 encoded_post_ids: vec![],
713 encoded_post_lengths: vec![],
714 encoded_post_healed: HashSet::new(),
715 peer: self.peer_self.read().clone(),
716 signature: Signature::zero(),
717 };
718
719 EncodedPostBundleV1 { header, encoded_posts_bytes: Bytes::new() }
720 }
721 };
722
723 if post_bundle.header.encoded_post_ids.contains(&decoded_post.post_id) {
725 anyhow::bail!("Post {} is already in the bundle", decoded_post.post_id);
726 }
727
728 post_bundle.header.time_millis = time_millis;
730 post_bundle.header.num_posts += 1;
731 post_bundle.header.overflowed = post_bundle.header.num_posts > config::ENCODED_POST_BUNDLE_V1_OVERFLOWED_NUM_POSTS || post_bundle_metadata.num_posts_granted > config::ENCODED_POST_BUNDLE_V1_OVERFLOWED_NUM_POSTS_GRANTED;
732 post_bundle.header.sealed = post_bundle.header.overflowed || time_millis > request.bucket_location.bucket_time_millis + request.bucket_location.duration + config::ENCODED_POST_BUNDLE_V1_ELAPSED_THRESHOLD_MILLIS;
733 post_bundle.header.encoded_post_ids.push(decoded_post.post_id);
734 post_bundle.header.encoded_post_lengths.push(request.encoded_post_bytes.len());
735 post_bundle.header.signature_generate(&self.server_id.keys.signature_key)?;
736 let mut posts_mut = BytesMut::from(post_bundle.encoded_posts_bytes.as_ref());
737 posts_mut.extend_from_slice(request.encoded_post_bytes.as_ref());
738 post_bundle.encoded_posts_bytes = posts_mut.freeze();
739 let post_bundle_bytes_new = post_bundle.to_bytes()?;
740
741 post_bundle_metadata.num_posts = post_bundle.header.num_posts;
743 post_bundle_metadata.overflowed = post_bundle.header.overflowed;
744 post_bundle_metadata.sealed = post_bundle.header.sealed;
745 post_bundle_metadata.size = post_bundle.header.encoded_post_lengths.iter().sum();
746
747 {
748 self.environment.put_post_bundle_bytes(time_millis, &request.bucket_location.location_id, &post_bundle_bytes_new)?;
749 self.environment
750 .put_post_bundle_metadata(time_millis, &request.bucket_location.location_id, &post_bundle_metadata, request.encoded_post_bytes.len())?;
751 }
752
753 info!("Persisted for {}: num_posts={} num_posts_granted={}", request.bucket_location, post_bundle_metadata.num_posts, post_bundle_metadata.num_posts_granted);
754
755 let submit_post_commit_token = SubmitPostCommitTokenV1::new(peer_self.clone(), request.bucket_location, decoded_post.post_id, &self.server_id.keys.signature_key);
756
757 let response = SubmitPostCommitResponseV1 { submit_post_commit_token };
758 Ok((PayloadResponseKind::SubmitPostCommitResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
759 }
760
761 #[allow(non_snake_case)]
762 async fn dispatch_network_payload_x_SubmitPostFeedbackV1(&self, _cancellation_token: CancellationToken, pow: Option<PeerPow>, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
763 anyhow_assert_eq!(&PayloadRequestKind::SubmitPostFeedbackV1, &payload_request_kind);
764
765 let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
766
767 let request = SubmitPostFeedbackV1::from_bytes(&mut bytes)?;
768 trace!("received SubmitPostFeedbackV1");
769
770 let pow = pow.ok_or_else(|| anyhow::anyhow!("pow required for SubmitPostFeedbackV1"))?;
772 if pow.pow < config::POW_MINIMUM_PER_FEEDBACK {
773 anyhow::bail!("Insufficient pow for feedback: {} < {}", pow.pow, config::POW_MINIMUM_PER_FEEDBACK);
774 }
775
776 request.encoded_post_feedback.pow_verify()?;
778
779 let location_id = request.bucket_location.location_id;
780
781 let (peers_nearer, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&location_id, config::REDUNDANT_SERVERS_PER_POST);
783
784 let accepted = (|| -> anyhow::Result<bool> {
785 if !among_peers_nearer {
786 return Ok(false);
787 }
788
789 let _post_bundle_lock = self.environment.get_read_lock_for_location_id(&location_id);
791 let Some(post_bundle_bytes) = self.environment.get_post_bundle_bytes(time_millis, &location_id)?
792 else {
793 return Ok(false);
794 };
795
796 let post_bundle = EncodedPostBundleV1::from_bytes(Bytes::from_owner(post_bundle_bytes), false)?;
797 if !post_bundle.header.encoded_post_ids.contains(&request.encoded_post_feedback.post_id) {
798 return Ok(false);
799 }
800
801 Ok(true)
802 })()?;
803
804 if accepted {
806 trace!("Accepted post feedback for location_id={} encoded_post_feedback={:?}", location_id, request.encoded_post_feedback);
807 self.environment.put_post_feedback_if_more_powerful(time_millis, &location_id, &request.encoded_post_feedback)?;
808 }
809
810 let response = SubmitPostFeedbackResponseV1 { peers_nearer, accepted };
811 Ok((PayloadResponseKind::SubmitPostFeedbackResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
812 }
813
814 #[allow(non_snake_case)]
815 async fn dispatch_network_payload_x_HealPostBundleClaimV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
816 anyhow_assert_eq!(&PayloadRequestKind::HealPostBundleClaimV1, &payload_request_kind);
817
818 fn generate_negatory_response() -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
819 let response = HealPostBundleClaimResponseV1 { needed_post_ids: vec![], token: None };
820 Ok((PayloadResponseKind::HealPostBundleClaimResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
821 }
822
823 let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
824 let request = HealPostBundleClaimV1::from_bytes(&mut bytes)?;
825 trace!("received HealPostBundleClaimV1");
826
827 request.bucket_location.validate()?;
829 if request.bucket_location.location_id != request.donor_header.location_id {
830 anyhow::bail!("HealPostBundleClaimV1: bucket_location.location_id does not match donor_header.location_id");
831 }
832
833 request.donor_header.verify()?;
835
836 let (_, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&request.donor_header.location_id, config::REDUNDANT_SERVERS_PER_POST);
838 if !among_peers_nearer {
839 return generate_negatory_response();
840 }
841
842 if self.heal_in_progress.contains_key(&request.donor_header.location_id) {
844 return generate_negatory_response();
845 }
846
847 let _lock = self.environment.get_read_lock_for_location_id(&request.donor_header.location_id);
849 let post_bundle_bytes = self.environment.get_post_bundle_bytes(time_millis, &request.donor_header.location_id)?;
850
851 let our_post_ids: HashSet<Id> = match post_bundle_bytes {
852 Some(bytes) => {
853 let bundle = EncodedPostBundleV1::from_bytes(Bytes::from_owner(bytes), false)?;
854 bundle.header.encoded_post_ids.into_iter().collect()
855 }
856 None => HashSet::new(),
857 };
858
859 let needed_post_ids: Vec<Id> = request.donor_header.encoded_post_ids.iter().filter(|id| !our_post_ids.contains(*id)).copied().collect();
861
862 if needed_post_ids.is_empty() {
863 return generate_negatory_response();
864 }
865
866 self.heal_in_progress.insert(request.donor_header.location_id, ());
867
868 let token = Some(HealPostBundleClaimTokenV1::new(
869 self.peer_self.read().clone(),
870 request.bucket_location,
871 needed_post_ids.clone(),
872 request.donor_header.signature,
873 &self.server_id.keys.signature_key,
874 ));
875 let response = HealPostBundleClaimResponseV1 { needed_post_ids, token };
876 Ok((PayloadResponseKind::HealPostBundleClaimResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
877 }
878
879 #[allow(non_snake_case)]
880 async fn dispatch_network_payload_x_HealPostBundleCommitV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
881 anyhow_assert_eq!(&PayloadRequestKind::HealPostBundleCommitV1, &payload_request_kind);
882
883 let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
884 let request = HealPostBundleCommitV1::from_bytes(&mut bytes)?;
885 trace!("received HealPostBundleCommitV1");
886
887 let peer_self = self.peer_self.read().clone();
889 if request.token.peer.id != peer_self.id {
890 anyhow::bail!("HealPostBundleCommitV1: token was not issued by this server");
891 }
892 request.token.verify()?;
893
894 if request.donor_header.signature != request.token.donor_header_signature {
896 anyhow::bail!("HealPostBundleCommitV1: donor_header signature does not match token");
897 }
898 request.donor_header.verify()?;
899
900 if request.token.bucket_location.location_id != request.donor_header.location_id {
901 anyhow::bail!("HealPostBundleCommitV1: token location_id does not match donor_header");
902 }
903
904 let location_id = request.donor_header.location_id;
905
906 let mut remaining_bytes = request.encoded_posts_bytes.clone();
908 let mut posts_to_add: Vec<(Id, Bytes)> = Vec::new();
909 for post_id in &request.token.needed_post_ids {
910 let len = request
911 .donor_header
912 .encoded_post_ids
913 .iter()
914 .zip(request.donor_header.encoded_post_lengths.iter())
915 .find(|(id, _)| *id == post_id)
916 .map(|(_, len)| *len)
917 .ok_or_else(|| anyhow::anyhow!("needed_post_id {} not found in donor_header", post_id))?;
918 if remaining_bytes.len() < len {
919 anyhow::bail!("HealPostBundleCommitV1: not enough bytes for post {}", post_id);
920 }
921 let post_bytes = remaining_bytes.split_to(len);
922 posts_to_add.push((*post_id, post_bytes));
923 }
924 if !remaining_bytes.is_empty() {
925 anyhow::bail!("HealPostBundleCommitV1: {} excess bytes", remaining_bytes.len());
926 }
927
928 for (post_id, post_bytes) in &posts_to_add {
930 EncodedPostV1::decode_from_bytes(post_bytes.clone(), &request.token.bucket_location.base_id, true, true).map_err(|e| anyhow::anyhow!("HealPostBundleCommitV1: post {} failed decryption: {}", post_id, e))?;
931 }
932
933 let _lock = self.environment.get_write_lock_for_location_id(&location_id);
935 let post_bundle_bytes = self.environment.get_post_bundle_bytes(time_millis, &location_id)?;
936 let post_bundle_metadata = self.environment.get_post_bundle_metadata(time_millis, &location_id)?;
937 let mut post_bundle_metadata = post_bundle_metadata.unwrap_or_else(PostBundleMetadata::zero);
938
939 let mut post_bundle = match post_bundle_bytes {
940 Some(b) => EncodedPostBundleV1::from_bytes(Bytes::from_owner(b), true)?,
941 None => EncodedPostBundleV1 {
942 header: EncodedPostBundleHeaderV1 {
943 time_millis: TimeMillis::zero(),
944 location_id,
945 overflowed: request.donor_header.overflowed,
946 sealed: request.donor_header.sealed,
947 num_posts: 0,
948 encoded_post_ids: vec![],
949 encoded_post_lengths: vec![],
950 encoded_post_healed: HashSet::new(),
951 peer: peer_self.clone(),
952 signature: Signature::zero(),
953 },
954 encoded_posts_bytes: Bytes::new(),
955 },
956 };
957
958 let our_post_ids: HashSet<Id> = post_bundle.header.encoded_post_ids.iter().copied().collect();
959
960 let mut posts_mut = BytesMut::from(post_bundle.encoded_posts_bytes.as_ref());
961 let mut added_any = false;
962 for (post_id, post_bytes) in posts_to_add {
963 if !our_post_ids.contains(&post_id) {
964 let len = post_bytes.len();
965 posts_mut.extend_from_slice(&post_bytes);
966 post_bundle.header.encoded_post_ids.push(post_id);
967 post_bundle.header.encoded_post_lengths.push(len);
968 post_bundle.header.encoded_post_healed.insert(post_id);
969 added_any = true;
970 }
971 }
972 post_bundle.encoded_posts_bytes = posts_mut.freeze();
973
974 if added_any {
975 post_bundle.header.time_millis = time_millis;
976 post_bundle.header.num_posts = post_bundle.header.encoded_post_ids.len() as u8;
977 post_bundle.header.signature_generate(&self.server_id.keys.signature_key)?;
978
979 let new_bytes = post_bundle.to_bytes()?;
980 post_bundle_metadata.num_posts = post_bundle.header.num_posts;
981 post_bundle_metadata.size = post_bundle.header.encoded_post_lengths.iter().sum();
982
983 self.environment.put_post_bundle_bytes(time_millis, &location_id, &new_bytes)?;
984 self.environment.put_post_bundle_metadata(time_millis, &location_id, &post_bundle_metadata, 0)?;
985
986 info!("Healed {} post(s) for location_id={}", post_bundle.header.encoded_post_healed.len(), location_id);
987 }
988
989 self.heal_in_progress.invalidate(&location_id);
990
991 let response = HealPostBundleCommitResponseV1 { accepted: added_any };
992 Ok((PayloadResponseKind::HealPostBundleCommitResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
993 }
994
995 #[allow(non_snake_case)]
996 async fn dispatch_network_payload_x_HealPostBundleFeedbackV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
997 anyhow_assert_eq!(&PayloadRequestKind::HealPostBundleFeedbackV1, &payload_request_kind);
998
999 let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
1000 let request = HealPostBundleFeedbackV1::from_bytes(&mut bytes)?;
1001 trace!("received HealPostBundleFeedbackV1 for location_id={}", request.location_id);
1002
1003 let (_, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&request.location_id, config::REDUNDANT_SERVERS_PER_POST);
1005 if !among_peers_nearer {
1006 let response = HealPostBundleFeedbackResponseV1 { accepted_count: 0 };
1007 return Ok((PayloadResponseKind::HealPostBundleFeedbackResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)));
1008 }
1009
1010 let _post_bundle_lock = self.environment.get_read_lock_for_location_id(&request.location_id);
1012 let post_bundle_bytes = self.environment.get_post_bundle_bytes(time_millis, &request.location_id)?;
1013 let Some(post_bundle_bytes) = post_bundle_bytes
1014 else {
1015 let response = HealPostBundleFeedbackResponseV1 { accepted_count: 0 };
1016 return Ok((PayloadResponseKind::HealPostBundleFeedbackResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)));
1017 };
1018 let post_bundle = EncodedPostBundleV1::from_bytes(Bytes::from_owner(post_bundle_bytes), false)?;
1019
1020 let mut accepted_count: u32 = 0;
1021 for feedback in &request.encoded_post_feedbacks {
1022 if !post_bundle.header.encoded_post_ids.contains(&feedback.post_id) {
1024 continue;
1025 }
1026 self.environment.put_post_feedback_if_more_powerful(time_millis, &request.location_id, feedback)?;
1027 accepted_count += 1;
1028 }
1029
1030 if accepted_count > 0 {
1031 trace!("Accepted {} healed feedback(s) for location_id={}", accepted_count, request.location_id);
1032 }
1033
1034 let response = HealPostBundleFeedbackResponseV1 { accepted_count };
1035 Ok((PayloadResponseKind::HealPostBundleFeedbackResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
1036 }
1037
1038 #[allow(non_snake_case)]
1039 async fn dispatch_network_payload_x_CachePostBundleV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
1040 anyhow_assert_eq!(&PayloadRequestKind::CachePostBundleV1, &payload_request_kind);
1041
1042 let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
1043 let request = CachePostBundleV1::from_bytes(&mut bytes)?;
1044 trace!("received CachePostBundleV1 for bucket_location={}", request.token.bucket_location);
1045
1046 let peer_self = self.peer_self.read().clone();
1048 if request.token.peer.id != peer_self.id {
1049 anyhow::bail!("CachePostBundleV1: token was not issued by this server");
1050 }
1051 request.token.verify()?;
1052 if request.token.is_expired(time_millis) {
1053 anyhow::bail!("CachePostBundleV1: token has expired");
1054 }
1055
1056 let mut any_accepted = false;
1057 for bundle_bytes in request.encoded_post_bundles {
1058 let parse_result: anyhow::Result<()> = try {
1059 let encoded_post_bundle = EncodedPostBundleV1::from_bytes(bundle_bytes.clone(), true)?;
1060
1061 encoded_post_bundle.verify(&request.token.bucket_location.base_id)?;
1063
1064 let originator_peer_id = encoded_post_bundle.header.peer.id;
1065 let is_sealed = encoded_post_bundle.header.sealed;
1066 if self.post_bundle_cache.on_upload(request.token.bucket_location.location_id, originator_peer_id, bundle_bytes, time_millis, is_sealed) {
1067 any_accepted = true;
1068 }
1069 };
1070 if let Err(e) = &parse_result {
1071 warn!("CachePostBundleV1: failed to parse bundle: {}", e);
1072 }
1073 }
1074 let response = CachePostBundleResponseV1 { accepted: any_accepted };
1075 Ok((PayloadResponseKind::CachePostBundleResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
1076 }
1077
1078 #[allow(non_snake_case)]
1079 async fn dispatch_network_payload_x_CachePostBundleFeedbackV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
1080 anyhow_assert_eq!(&PayloadRequestKind::CachePostBundleFeedbackV1, &payload_request_kind);
1081
1082 let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
1083 let request = CachePostBundleFeedbackV1::from_bytes(&mut bytes)?;
1084 trace!("received CachePostBundleFeedbackV1 for bucket_location={}", request.token.bucket_location);
1085
1086 let peer_self = self.peer_self.read().clone();
1088 if request.token.peer.id != peer_self.id {
1089 anyhow::bail!("CachePostBundleFeedbackV1: token was not issued by this server");
1090 }
1091 request.token.verify()?;
1092 if request.token.is_expired(time_millis) {
1093 anyhow::bail!("CachePostBundleFeedbackV1: token has expired");
1094 }
1095
1096 let result: anyhow::Result<bool> = try {
1097 let encoded_post_bundle_feedback = EncodedPostBundleFeedbackV1::from_bytes(request.encoded_post_bundle_feedback_bytes.clone())?;
1098
1099 encoded_post_bundle_feedback.verify()?;
1100
1101 let originator_peer_id = encoded_post_bundle_feedback.header.peer.id;
1102 self.post_bundle_feedback_cache
1104 .on_upload(request.token.bucket_location.location_id, originator_peer_id, request.encoded_post_bundle_feedback_bytes, time_millis, false)
1105 };
1106 let accepted = result.unwrap_or_else(|e| {
1107 warn!("CachePostBundleFeedbackV1: parse error: {}", e);
1108 false
1109 });
1110
1111 let response = CachePostBundleFeedbackResponseV1 { accepted };
1112 Ok((PayloadResponseKind::CachePostBundleFeedbackResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
1113 }
1114
1115 #[allow(non_snake_case)]
1116 async fn dispatch_network_payload_x_FetchUrlPreviewV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
1117 anyhow_assert_eq!(&PayloadRequestKind::FetchUrlPreviewV1, &payload_request_kind);
1118
1119 let request = FetchUrlPreviewV1::from_bytes(&mut bytes)?;
1120 trace!("received FetchUrlPreviewV1 for url={}", request.url);
1121
1122 if !request.url.starts_with("https://") {
1125 anyhow::bail!("FetchUrlPreviewV1 SSRF: only https:// URLs are allowed");
1126 }
1127
1128 let host_and_port = request.url["https://".len()..].split(&['/', '?', '#'][..]).next().unwrap_or("");
1130 let host = if host_and_port.starts_with('[') {
1131 host_and_port.trim_start_matches('[').split(']').next().unwrap_or("")
1133 } else {
1134 host_and_port.split(':').next().unwrap_or(host_and_port)
1136 };
1137 if host.is_empty() {
1138 anyhow::bail!("FetchUrlPreviewV1 SSRF: could not extract host from URL");
1139 }
1140 if host.parse::<std::net::IpAddr>().is_ok() {
1141 anyhow::bail!("FetchUrlPreviewV1 SSRF: bare IP addresses are not allowed");
1142 }
1143
1144 let resolved_socket_addrs: Vec<std::net::SocketAddr> = tokio::net::lookup_host((host, 443u16))
1148 .await
1149 .map_err(|e| anyhow::anyhow!("FetchUrlPreviewV1 SSRF: DNS resolution failed for {}: {}", host, e))?
1150 .collect();
1151 if resolved_socket_addrs.is_empty() {
1152 anyhow::bail!("FetchUrlPreviewV1 SSRF: DNS returned no addresses for {}", host);
1153 }
1154 for socket_addr in &resolved_socket_addrs {
1155 let ip = socket_addr.ip();
1156 if is_ssrf_protected_ip(ip) {
1157 anyhow::bail!("FetchUrlPreviewV1 SSRF: {} resolved to protected address {}", host, ip);
1158 }
1159 }
1160
1161 let http_client = reqwest::Client::builder()
1168 .connect_timeout(std::time::Duration::from_secs(1))
1169 .timeout(std::time::Duration::from_secs(3))
1170 .user_agent("hashiverse-preview/1.0")
1171 .resolve_to_addrs(host, &resolved_socket_addrs)
1172 .redirect(reqwest::redirect::Policy::none())
1173 .no_proxy()
1174 .build()?;
1175
1176 const URL_FETCH_MAX_BODY_BYTES: usize = 2 * 1024 * 1024;
1177 let mut http_response = http_client.get(&request.url).send().await?;
1178
1179 if let Some(content_length) = http_response.content_length() {
1181 if content_length > URL_FETCH_MAX_BODY_BYTES as u64 {
1182 anyhow::bail!("FetchUrlPreviewV1: Content-Length {} exceeds {} byte limit", content_length, URL_FETCH_MAX_BODY_BYTES);
1183 }
1184 }
1185 let mut body_bytes = BytesMut::new();
1186 while let Some(chunk) = http_response.chunk().await? {
1187 let remaining = URL_FETCH_MAX_BODY_BYTES - body_bytes.len();
1192 body_bytes.extend_from_slice(&chunk[..chunk.len().min(remaining)]);
1193 if body_bytes.len() >= URL_FETCH_MAX_BODY_BYTES {
1194 break;
1195 }
1196 }
1197 let html = String::from_utf8_lossy(&body_bytes).into_owned();
1198
1199 let preview_data = url_preview::extract_url_preview(&html);
1200
1201 let response = FetchUrlPreviewResponseV1 {
1202 url: if preview_data.canonical_url.is_empty() { request.url } else { preview_data.canonical_url },
1203 title: preview_data.title,
1204 description: preview_data.description,
1205 image_url: preview_data.image_url,
1206 };
1207
1208 Ok((PayloadResponseKind::FetchUrlPreviewResponseV1, BytesGatherer::from_bytes(response.to_bytes()?)))
1209 }
1210
1211 #[allow(non_snake_case)]
1212 async fn dispatch_network_payload_x_TrendingHashtagsFetchV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
1213 anyhow_assert_eq!(&PayloadRequestKind::TrendingHashtagsFetchV1, &payload_request_kind);
1214
1215 let request = TrendingHashtagsFetchV1::from_bytes(&mut bytes)?;
1216 trace!("received TrendingHashtagsFetchV1 with limit={}", request.limit);
1217
1218 let time_millis = self.runtime_services.time_provider.current_time_millis();
1219
1220 let cached_response = {
1222 let cache = self.trending_hashtags_response_cache.lock();
1223 match cache.as_ref() {
1224 Some((cached_time, cached_response)) if (time_millis - *cached_time) < MILLIS_IN_SECOND.const_mul(30) => {
1225 Some(cached_response.clone())
1226 }
1227 _ => None,
1228 }
1229 };
1230
1231 let mut response = match cached_response {
1232 Some(mut cached) => {
1233 cached.trending_hashtags.truncate(request.limit as usize);
1234 cached
1235 }
1236 None => {
1237 let mut trending_hashtags: Vec<TrendingHashtagV1> = self.trending_hashtags.iter()
1239 .map(|(hashtag, hll)| TrendingHashtagV1 {
1240 hashtag: hashtag.as_ref().clone(),
1241 count: hll.count(),
1242 })
1243 .filter(|entry| entry.count > 0)
1244 .collect();
1245
1246 trending_hashtags.sort_by_key(|entry| std::cmp::Reverse(entry.count));
1247
1248 let full_response = TrendingHashtagsFetchResponseV1 { trending_hashtags };
1249
1250 {
1252 let mut cache = self.trending_hashtags_response_cache.lock();
1253 *cache = Some((time_millis, full_response.clone()));
1254 }
1255
1256 let mut truncated_response = full_response;
1257 truncated_response.trending_hashtags.truncate(request.limit as usize);
1258 truncated_response
1259 }
1260 };
1261
1262 top_up_trending_hashtags_with_fallback(&mut response.trending_hashtags, request.limit, TRENDING_HASHTAGS_FALLBACK);
1263
1264 Ok((PayloadResponseKind::TrendingHashtagsFetchResponseV1, BytesGatherer::from_bytes(response.to_bytes()?)))
1265 }
1266
1267 #[allow(non_snake_case)]
1268 async fn dispatch_network_payload_x_PeerStatsRequestV1(&self, _cancellation_token: CancellationToken, pow: Option<PeerPow>, payload_request_kind: PayloadRequestKind, bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
1269 anyhow_assert_eq!(&PayloadRequestKind::PeerStatsRequestV1, &payload_request_kind);
1270
1271 let _request = PeerStatsRequestV1::from_bytes(&bytes)?;
1272
1273 let pow = pow.ok_or_else(|| anyhow::anyhow!("pow required for PeerStatsRequestV1"))?;
1274 if pow.pow < config::POW_MINIMUM_PER_PEER_STATS {
1275 anyhow::bail!("Insufficient pow for PeerStatsRequestV1: {} < {}", pow.pow, config::POW_MINIMUM_PER_PEER_STATS);
1276 }
1277
1278 let time_millis = self.runtime_services.time_provider.current_time_millis();
1279
1280 let cached_response = {
1283 let cache = self.peer_stats_response_cache.lock();
1284 match cache.as_ref() {
1285 Some((cached_time, cached_response)) if (time_millis - *cached_time) < MILLIS_IN_SECOND.const_mul(60) => Some(cached_response.clone()),
1286 _ => None,
1287 }
1288 };
1289
1290 let response = match cached_response {
1291 Some(cached) => cached,
1292 None => {
1293 let doc = serde_json::json!({
1294 "version": env!("CARGO_PKG_VERSION"),
1295 "requests": request_counts_subtree(&self.request_counters),
1296 "system": system_stats_subtree(),
1297 "kademlia": kademlia_stats_subtree(&self.kademlia.read()),
1298 "environment": environment_stats_subtree(&self.environment),
1299 });
1300
1301 let json_bytes = serde_json::to_vec(&doc)?;
1302 let json_compressed = compression::compress_for_speed(&json_bytes)?.to_bytes();
1303
1304 let signing_input = PeerStatsResponseV1::signing_input(time_millis, &json_compressed);
1305 let signature = signing::sign(&self.server_id.keys.signature_key, &signing_input);
1306
1307 let response = PeerStatsResponseV1 {
1308 peer: self.peer_self.read().clone(),
1309 timestamp: time_millis,
1310 json_compressed,
1311 signature,
1312 };
1313
1314 *self.peer_stats_response_cache.lock() = Some((time_millis, response.clone()));
1315 response
1316 }
1317 };
1318
1319 Ok((PayloadResponseKind::PeerStatsResponseV1, BytesGatherer::from_bytes(response.to_bytes()?)))
1320 }
1321}
1322
1323#[cfg(test)]
1324mod tests {
1325 use super::*;
1326
1327 fn make_trending_hashtag(hashtag: &str, count: u64) -> TrendingHashtagV1 {
1328 TrendingHashtagV1 { hashtag: hashtag.to_string(), count }
1329 }
1330
1331 #[test]
1332 fn top_up_adds_fallback_when_empty() {
1333 let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1334 top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 5, &["#hashiverse", "#news"]);
1335 assert_eq!(trending_hashtags.len(), 2);
1336 assert_eq!(trending_hashtags[0].hashtag, "#hashiverse");
1337 assert_eq!(trending_hashtags[0].count, 0);
1338 assert_eq!(trending_hashtags[1].hashtag, "#news");
1339 assert_eq!(trending_hashtags[1].count, 0);
1340 }
1341
1342 #[test]
1343 fn top_up_respects_limit_smaller_than_fallback_list() {
1344 let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1345 top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 1, &["#hashiverse", "#news"]);
1346 assert_eq!(trending_hashtags.len(), 1);
1347 assert_eq!(trending_hashtags[0].hashtag, "#hashiverse");
1348 }
1349
1350 #[test]
1351 fn top_up_preserves_fallback_order() {
1352 let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1353 top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 3, &["#first", "#second", "#third"]);
1354 assert_eq!(trending_hashtags[0].hashtag, "#first");
1355 assert_eq!(trending_hashtags[1].hashtag, "#second");
1356 assert_eq!(trending_hashtags[2].hashtag, "#third");
1357 }
1358
1359 #[test]
1360 fn top_up_is_noop_when_already_at_limit() {
1361 let mut trending_hashtags = vec![make_trending_hashtag("#rust", 10), make_trending_hashtag("#golang", 5)];
1362 top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 2, &["#hashiverse", "#news"]);
1363 assert_eq!(trending_hashtags.len(), 2);
1364 assert_eq!(trending_hashtags[0].hashtag, "#rust");
1365 assert_eq!(trending_hashtags[1].hashtag, "#golang");
1366 }
1367
1368 #[test]
1369 fn top_up_partially_fills_when_real_trending_exists() {
1370 let mut trending_hashtags = vec![make_trending_hashtag("#rust", 10)];
1371 top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 3, &["#hashiverse", "#news"]);
1372 assert_eq!(trending_hashtags.len(), 3);
1373 assert_eq!(trending_hashtags[0].hashtag, "#rust");
1374 assert_eq!(trending_hashtags[0].count, 10);
1375 assert_eq!(trending_hashtags[1].hashtag, "#hashiverse");
1376 assert_eq!(trending_hashtags[1].count, 0);
1377 assert_eq!(trending_hashtags[2].hashtag, "#news");
1378 assert_eq!(trending_hashtags[2].count, 0);
1379 }
1380
1381 #[test]
1382 fn top_up_skips_fallback_already_present_exact_match() {
1383 let mut trending_hashtags = vec![make_trending_hashtag("#hashiverse", 42)];
1384 top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 3, &["#hashiverse", "#news"]);
1385 assert_eq!(trending_hashtags.len(), 2);
1386 assert_eq!(trending_hashtags[0].hashtag, "#hashiverse");
1387 assert_eq!(trending_hashtags[0].count, 42, "real trending entry must not be overwritten by filler");
1388 assert_eq!(trending_hashtags[1].hashtag, "#news");
1389 assert_eq!(trending_hashtags[1].count, 0);
1390 }
1391
1392 #[test]
1393 fn top_up_dedup_is_case_insensitive_and_prefix_agnostic() {
1394 let mut trending_hashtags = vec![make_trending_hashtag("HashiVerse", 7)];
1396 top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 3, &["#hashiverse", "#news"]);
1397 assert_eq!(trending_hashtags.len(), 2);
1398 assert_eq!(trending_hashtags[0].hashtag, "HashiVerse");
1399 assert_eq!(trending_hashtags[1].hashtag, "#news");
1400 }
1401
1402 #[test]
1403 fn top_up_with_empty_fallback_is_noop() {
1404 let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1405 top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 5, &[]);
1406 assert_eq!(trending_hashtags.len(), 0);
1407 }
1408
1409 #[test]
1410 fn top_up_handles_zero_limit() {
1411 let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1412 top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 0, &["#hashiverse", "#news"]);
1413 assert_eq!(trending_hashtags.len(), 0);
1414 }
1415
1416 #[test]
1417 fn top_up_exhausts_fallback_without_reaching_limit() {
1418 let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
1420 top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 10, &["#hashiverse", "#news"]);
1421 assert_eq!(trending_hashtags.len(), 2, "should stop at the end of the fallback list, not pad further");
1422 }
1423
1424 mod peer_stats {
1425 use super::*;
1426 use crate::environment::mem_environment_store::MemEnvironmentFactory;
1427 use crate::environment::environment::EnvironmentFactory;
1428 use crate::server::args::Args;
1429 use crate::server::hashiverse_server::HashiverseServer;
1430 use hashiverse_lib::protocol::payload::payload::{PAYLOAD_REQUEST_KIND_COUNT, PeerStatsRequestV1, PeerStatsResponseV1};
1431 use hashiverse_lib::protocol::peer::PeerPow;
1432 use hashiverse_lib::tools::compression;
1433 use hashiverse_lib::tools::pow_generator::single_threaded_pow_generator::SingleThreadedPowGenerator;
1434 use hashiverse_lib::tools::runtime_services::RuntimeServices;
1435 use hashiverse_lib::tools::time::TimeMillis;
1436 use hashiverse_lib::tools::time_provider::time_provider::RealTimeProvider;
1437 use hashiverse_lib::tools::types::{Pow, VerificationKey};
1438 use hashiverse_lib::transport::mem_transport::MemTransportFactory;
1439 use std::sync::Arc;
1440 use std::sync::atomic::Ordering;
1441
1442 async fn make_server() -> anyhow::Result<Arc<HashiverseServer>> {
1443 let time_provider = Arc::new(RealTimeProvider);
1444 let transport_factory = MemTransportFactory::default();
1445 let pow_generator = Arc::new(SingleThreadedPowGenerator::new());
1446 let runtime_services = Arc::new(RuntimeServices { time_provider, transport_factory, pow_generator });
1447 let environment_factory = Arc::new(MemEnvironmentFactory::new(""));
1448 let args = Args::default_for_testing();
1449 HashiverseServer::new(runtime_services, environment_factory, args).await
1450 }
1451
1452 fn synthetic_pow(pow: Pow) -> PeerPow {
1456 let mut peer_pow = PeerPow::zero();
1457 peer_pow.pow = pow;
1458 peer_pow
1459 }
1460
1461 fn empty_request_bytes() -> Bytes {
1462 PeerStatsRequestV1 {}.to_bytes().expect("PeerStatsRequestV1 must serialise")
1463 }
1464
1465 fn decode_doc(response: &PeerStatsResponseV1) -> serde_json::Value {
1466 let bytes = compression::decompress(&response.json_compressed).expect("decompress doc").to_bytes();
1467 serde_json::from_slice(&bytes).expect("doc must be valid JSON")
1468 }
1469
1470 #[tokio::test]
1471 async fn rejects_insufficient_pow() {
1472 let server = make_server().await.expect("server must start");
1473 let result = server
1474 .dispatch_network_payload_x_PeerStatsRequestV1(
1475 CancellationToken::new(),
1476 Some(synthetic_pow(Pow(config::POW_MINIMUM_PER_PEER_STATS.0.saturating_sub(1)))),
1477 PayloadRequestKind::PeerStatsRequestV1,
1478 empty_request_bytes(),
1479 )
1480 .await;
1481 assert!(result.is_err(), "expected insufficient PoW to be rejected");
1482 }
1483
1484 #[tokio::test]
1485 async fn returns_response_with_expected_top_level_keys() {
1486 let server = make_server().await.expect("server must start");
1487 let (response_kind, gatherer) = server
1488 .dispatch_network_payload_x_PeerStatsRequestV1(
1489 CancellationToken::new(),
1490 Some(synthetic_pow(config::POW_MINIMUM_PER_PEER_STATS)),
1491 PayloadRequestKind::PeerStatsRequestV1,
1492 empty_request_bytes(),
1493 )
1494 .await
1495 .expect("handler must succeed at threshold pow");
1496 assert_eq!(response_kind, PayloadResponseKind::PeerStatsResponseV1);
1497
1498 let response_bytes = gatherer.to_bytes();
1499 let response = PeerStatsResponseV1::from_bytes(&response_bytes).expect("response must decode");
1500 let doc = decode_doc(&response);
1501
1502 assert!(doc.get("version").and_then(|v| v.as_str()).map(|s| !s.is_empty()).unwrap_or(false), "version must be a non-empty string");
1503 assert_eq!(doc["version"].as_str().unwrap(), env!("CARGO_PKG_VERSION"));
1504 assert!(doc.get("requests").is_some(), "requests subtree missing");
1505 assert!(doc.get("system").is_some(), "system subtree missing");
1506 assert!(doc.get("kademlia").is_some(), "kademlia subtree missing");
1507 assert!(doc.get("environment").is_some(), "environment subtree missing");
1508
1509 for key in ["memory_total_bytes", "memory_free_bytes", "disk_total_bytes", "disk_free_bytes", "load_1m", "load_5m", "load_15m"] {
1510 assert!(doc["system"].get(key).map(|v| v.is_number()).unwrap_or(false), "system.{key} must be a number");
1511 }
1512 for key in ["post_bundle_count", "post_bundle_feedback_count", "post_bundle_total_bytes"] {
1513 assert!(doc["environment"].get(key).map(|v| v.is_number()).unwrap_or(false), "environment.{key} must be a number");
1514 }
1515 }
1516
1517 #[tokio::test]
1518 async fn counters_reflect_recorded_dispatches() {
1519 let server = make_server().await.expect("server must start");
1520
1521 for _ in 0..7 {
1525 server.request_counters[PayloadRequestKind::PingV1 as usize].fetch_add(1, Ordering::Relaxed);
1526 }
1527
1528 let (_, gatherer) = server
1529 .dispatch_network_payload_x_PeerStatsRequestV1(
1530 CancellationToken::new(),
1531 Some(synthetic_pow(config::POW_MINIMUM_PER_PEER_STATS)),
1532 PayloadRequestKind::PeerStatsRequestV1,
1533 empty_request_bytes(),
1534 )
1535 .await
1536 .expect("handler must succeed");
1537 let response = PeerStatsResponseV1::from_bytes(&gatherer.to_bytes()).expect("response must decode");
1538 let doc = decode_doc(&response);
1539 assert_eq!(doc["requests"]["PingV1"].as_u64(), Some(7));
1540 }
1541
1542 #[tokio::test]
1543 async fn cache_returns_byte_identical_response_within_ttl() {
1544 let server = make_server().await.expect("server must start");
1545 let pow = synthetic_pow(config::POW_MINIMUM_PER_PEER_STATS);
1546
1547 let (_, gatherer_a) = server
1548 .dispatch_network_payload_x_PeerStatsRequestV1(CancellationToken::new(), Some(pow.clone()), PayloadRequestKind::PeerStatsRequestV1, empty_request_bytes())
1549 .await
1550 .expect("first call must succeed");
1551 let bytes_a = gatherer_a.to_bytes();
1552
1553 server.request_counters[PayloadRequestKind::PingV1 as usize].fetch_add(99, Ordering::Relaxed);
1556
1557 let (_, gatherer_b) = server
1558 .dispatch_network_payload_x_PeerStatsRequestV1(CancellationToken::new(), Some(pow), PayloadRequestKind::PeerStatsRequestV1, empty_request_bytes())
1559 .await
1560 .expect("second call must succeed");
1561 let bytes_b = gatherer_b.to_bytes();
1562
1563 assert_eq!(bytes_a, bytes_b, "cached response should be byte-identical across the TTL");
1564 }
1565
1566 #[tokio::test]
1567 async fn signature_verifies_and_fails_on_tamper() {
1568 let server = make_server().await.expect("server must start");
1569 let (_, gatherer) = server
1570 .dispatch_network_payload_x_PeerStatsRequestV1(
1571 CancellationToken::new(),
1572 Some(synthetic_pow(config::POW_MINIMUM_PER_PEER_STATS)),
1573 PayloadRequestKind::PeerStatsRequestV1,
1574 empty_request_bytes(),
1575 )
1576 .await
1577 .expect("handler must succeed");
1578 let response = PeerStatsResponseV1::from_bytes(&gatherer.to_bytes()).expect("response must decode");
1579
1580 let verification_key = VerificationKey::from_bytes(&response.peer.verification_key_bytes).expect("verification key must decode");
1581 let signing_input = PeerStatsResponseV1::signing_input(response.timestamp, &response.json_compressed);
1582 signing::verify(&verification_key, &response.signature, &signing_input).expect("signature must verify against transmitted bytes");
1583
1584 let mut tampered = response.json_compressed.to_vec();
1586 let tamper_index = tampered.len() / 2;
1587 tampered[tamper_index] ^= 0xff;
1588 let tampered_signing_input = PeerStatsResponseV1::signing_input(response.timestamp, &tampered);
1589 assert!(signing::verify(&verification_key, &response.signature, &tampered_signing_input).is_err(), "verification must fail when json_compressed is tampered");
1590
1591 let bumped_signing_input = PeerStatsResponseV1::signing_input(TimeMillis(response.timestamp.0 + 1), &response.json_compressed);
1593 assert!(signing::verify(&verification_key, &response.signature, &bumped_signing_input).is_err(), "verification must fail when timestamp is mutated");
1594 }
1595
1596 #[test]
1597 fn request_counts_subtree_covers_every_variant() {
1598 let counters: [std::sync::atomic::AtomicU64; PAYLOAD_REQUEST_KIND_COUNT] = std::array::from_fn(|_| std::sync::atomic::AtomicU64::new(0));
1601 let subtree = request_counts_subtree(&counters);
1602 let map = subtree.as_object().expect("request_counts subtree must be an object");
1603 assert_eq!(map.len(), PAYLOAD_REQUEST_KIND_COUNT);
1604 }
1605 }
1606}
1607