use crate::environment::environment::PostBundleMetadata;
use crate::server::hashiverse_server::HashiverseServer;
use crate::tools::tools::is_ssrf_protected_ip;
use bytes::{Bytes, BytesMut};
use hashiverse_lib::anyhow_assert_eq;
use hashiverse_lib::protocol::payload::payload::{
AnnounceResponseV1, AnnounceV1, BootstrapResponseV1, CachePostBundleFeedbackResponseV1, CachePostBundleFeedbackV1, CachePostBundleResponseV1, CachePostBundleV1, ErrorResponseV1, FetchUrlPreviewResponseV1, FetchUrlPreviewV1,
GetPostBundleFeedbackResponseV1, GetPostBundleFeedbackV1, GetPostBundleResponseV1, GetPostBundleV1, HealPostBundleClaimResponseV1, HealPostBundleClaimTokenV1, HealPostBundleClaimV1, HealPostBundleCommitResponseV1, HealPostBundleCommitV1,
HealPostBundleFeedbackResponseV1, HealPostBundleFeedbackV1, PayloadRequestKind, PayloadResponseKind, PingResponseV1, PeerStatsRequestV1, PeerStatsResponseV1, SubmitPostClaimResponseV1, SubmitPostClaimTokenV1, SubmitPostClaimV1,
SubmitPostCommitResponseV1, SubmitPostCommitTokenV1, SubmitPostCommitV1, SubmitPostFeedbackResponseV1, SubmitPostFeedbackV1, TrendingHashtagV1, TrendingHashtagsFetchResponseV1, TrendingHashtagsFetchV1,
};
use hashiverse_lib::protocol::peer::PeerPow;
use hashiverse_lib::protocol::posting::amplification::get_minimum_post_pow;
use hashiverse_lib::protocol::posting::encoded_post::EncodedPostV1;
use hashiverse_lib::protocol::posting::encoded_post_bundle::{EncodedPostBundleHeaderV1, EncodedPostBundleV1};
use hashiverse_lib::protocol::posting::encoded_post_bundle_feedback::{EncodedPostBundleFeedbackHeaderV1, EncodedPostBundleFeedbackV1};
use hashiverse_lib::protocol::rpc::rpc_request::RpcRequestPacketRx;
use hashiverse_lib::protocol::rpc::rpc_response::{RpcResponsePacketTx, RpcResponsePacketTxFlags};
use hashiverse_lib::tools::buckets::{BucketLocation, BucketType, BUCKET_DURATIONS};
use hashiverse_lib::tools::time::{TimeMillis, MILLIS_IN_SECOND};
use hashiverse_lib::tools::hyper_log_log::HyperLogLog;
use hashiverse_lib::tools::types::{Id, Signature};
use hashiverse_lib::tools::{hashing, url_preview};
use hashiverse_lib::tools::{compression, config, json, signing, BytesGatherer};
use hashiverse_lib::transport::transport::IncomingRequest;
use log::{info, trace, warn};
use std::collections::HashSet;
use std::sync::atomic::Ordering;
use crate::server::stats::{environment_stats_subtree, kademlia_stats_subtree, request_counts_subtree, system_stats_subtree};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
const TRENDING_HASHTAGS_FALLBACK: &[&str] = &["hashiverse", "news"];
fn normalise_hashtag(hashtag: &str) -> String {
let lowercased = hashtag.to_lowercase();
match lowercased.strip_prefix('#') {
Some(stripped) => stripped.to_string(),
None => lowercased,
}
}
fn top_up_trending_hashtags_with_fallback(trending_hashtags: &mut Vec<TrendingHashtagV1>, limit: u16, fallback_hashtags: &[&str]) {
let target_length = limit as usize;
if trending_hashtags.len() >= target_length {
return;
}
let mut existing_normalised_hashtags: HashSet<String> = trending_hashtags.iter()
.map(|entry| normalise_hashtag(&entry.hashtag))
.collect();
for fallback_hashtag in fallback_hashtags {
if trending_hashtags.len() >= target_length {
break;
}
let normalised_fallback_hashtag = normalise_hashtag(fallback_hashtag);
if existing_normalised_hashtags.contains(&normalised_fallback_hashtag) {
continue;
}
trending_hashtags.push(TrendingHashtagV1 {
hashtag: (*fallback_hashtag).to_string(),
count: 0,
});
existing_normalised_hashtags.insert(normalised_fallback_hashtag);
}
}
impl HashiverseServer {
pub async fn wrap_and_dispatch_network_envelopes(&self, cancellation_token: CancellationToken, mut rx: mpsc::Receiver<IncomingRequest>) -> Result<(), anyhow::Error> {
loop {
tokio::select! {
_ = cancellation_token.cancelled() => { break },
receipt = rx.recv() => {
match receipt {
Some(incoming) => {
let result = self.wrap_and_dispatch_network_envelope(cancellation_token.clone(), &incoming).await;
match result {
Ok(bytes) => {
let result = incoming.reply.send(bytes);
if result.is_err() { warn!("failed to send reply"); }
},
Err(e) => {
warn!("failed to process packet from {}: {}", incoming.caller_address, e);
incoming.report_bad_request();
drop(incoming.reply);
},
}
},
None => {
warn!("channel closed");
break;
}
}
}
}
}
Ok(())
}
async fn wrap_and_dispatch_network_envelope(&self, cancellation_token: CancellationToken, incoming: &IncomingRequest) -> anyhow::Result<BytesGatherer> {
let caller_address = incoming.caller_address.as_str();
let current_time_millis = self.runtime_services.time_provider.current_time_millis();
let rpc_request_packet_rx = RpcRequestPacketRx::decode(¤t_time_millis, &self.server_id.keys.verification_key_bytes, &self.server_id.keys.pq_commitment_bytes, incoming.bytes.clone())?;
self.request_counters[rpc_request_packet_rx.payload_request_kind.clone() as usize].fetch_add(1, Ordering::Relaxed);
{
if self.seen_salts.contains_key(&rpc_request_packet_rx.pow_salt) {
anyhow::bail!("replay detected: salt already seen");
}
self.seen_salts.insert(rpc_request_packet_rx.pow_salt, ());
}
let pow_content_hash = rpc_request_packet_rx.pow_content_hash;
let dispatch_result: anyhow::Result<BytesGatherer> = try {
let pow = match rpc_request_packet_rx.pow_server_known {
true => {
let (pow, improved_pow_current_day, improved_pow_current_month) = {
let peer_self = self.peer_self.read(); let pow = PeerPow::new(
rpc_request_packet_rx.pow_sponsor_id,
&peer_self.verification_key_bytes,
&peer_self.pq_commitment_bytes,
rpc_request_packet_rx.pow_timestamp,
rpc_request_packet_rx.pow_content_hash,
rpc_request_packet_rx.pow_salt,
)?;
let improved_pow_current_day = pow.pow_decayed_day(current_time_millis) > peer_self.pow_current_day.pow_decayed_day(current_time_millis);
let improved_pow_current_month = pow.pow_decayed_month(current_time_millis) > peer_self.pow_current_month.pow_decayed_month(current_time_millis);
(pow, improved_pow_current_day, improved_pow_current_month)
};
if improved_pow_current_day || improved_pow_current_month {
let mut peer_self = self.peer_self.write(); if improved_pow_current_day {
trace!("pow_current_day upgraded {} -> {}", peer_self.pow_current_day, pow);
peer_self.pow_current_day = pow.clone();
}
if improved_pow_current_month {
trace!("pow_current_month upgraded {} -> {}", peer_self.pow_current_month, pow);
peer_self.pow_current_month = pow.clone();
}
peer_self.sign(self.runtime_services.time_provider.as_ref(), &self.server_id.keys.signature_key)?;
}
Some(pow)
}
false => {
match rpc_request_packet_rx.payload_request_kind {
PayloadRequestKind::BootstrapV1 => {}
_ => anyhow::bail!("Anonymous pow not allowed for {}", rpc_request_packet_rx.payload_request_kind),
}
None
}
};
let (compress_response, payload_response_kind, payload) = self.dispatch_network_envelope(cancellation_token, pow, rpc_request_packet_rx).await?;
let response_flags = match compress_response {
true => RpcResponsePacketTxFlags::COMPRESSED,
false => RpcResponsePacketTxFlags::empty(),
};
RpcResponsePacketTx::encode(
&self.server_id.keys.signature_key,
&self.server_id.keys.verification_key_bytes,
&self.server_id.keys.pq_commitment_bytes,
&self.server_id.sponsor_id,
&self.server_id.timestamp,
&self.server_id.hash,
&self.server_id.salt,
&pow_content_hash,
response_flags,
payload_response_kind,
payload,
)?
};
match dispatch_result {
Ok(results) => Ok(results),
Err(e) => {
warn!("failed to dispatch packet from {}: {}", caller_address, e);
incoming.report_bad_request();
let payload_response_kind = PayloadResponseKind::ErrorResponseV1;
let response = ErrorResponseV1 { code: 0, message: e.to_string() };
let payload = BytesGatherer::from_bytes(json::struct_to_bytes(&response)?);
RpcResponsePacketTx::encode(
&self.server_id.keys.signature_key,
&self.server_id.keys.verification_key_bytes,
&self.server_id.keys.pq_commitment_bytes,
&self.server_id.sponsor_id,
&self.server_id.timestamp,
&self.server_id.hash,
&self.server_id.salt,
&pow_content_hash,
RpcResponsePacketTxFlags::COMPRESSED,
payload_response_kind,
payload,
)
}
}
}
async fn dispatch_network_envelope(&self, cancellation_token: CancellationToken, pow: Option<PeerPow>, rpc_request_packet_rx: RpcRequestPacketRx) -> anyhow::Result<(bool, PayloadResponseKind, BytesGatherer)> {
let compress_response = match rpc_request_packet_rx.payload_request_kind {
PayloadRequestKind::GetPostBundleV1 => false, PayloadRequestKind::CachePostBundleV1 => false, _ => true,
};
let (payload_response_kind, payload) = match rpc_request_packet_rx.payload_request_kind {
PayloadRequestKind::ErrorV1 => {
anyhow::bail!("Received ErrorV1");
}
PayloadRequestKind::PingV1 => self.dispatch_network_payload_x_PingV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
PayloadRequestKind::BootstrapV1 => self.dispatch_network_payload_x_BootstrapV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
PayloadRequestKind::AnnounceV1 => self.dispatch_network_payload_x_AnnounceV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
PayloadRequestKind::GetPostBundleV1 => self.dispatch_network_payload_x_GetPostBundleV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
PayloadRequestKind::GetPostBundleFeedbackV1 => { self.dispatch_network_payload_x_GetPostBundleFeedbackV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
PayloadRequestKind::SubmitPostClaimV1 => { self.dispatch_network_payload_x_SubmitPostClaimV1(cancellation_token, pow, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
PayloadRequestKind::SubmitPostCommitV1 => { self.dispatch_network_payload_x_SubmitPostCommitV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
PayloadRequestKind::SubmitPostFeedbackV1 => { self.dispatch_network_payload_x_SubmitPostFeedbackV1(cancellation_token, pow, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
PayloadRequestKind::HealPostBundleClaimV1 => { self.dispatch_network_payload_x_HealPostBundleClaimV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
PayloadRequestKind::HealPostBundleCommitV1 => { self.dispatch_network_payload_x_HealPostBundleCommitV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
PayloadRequestKind::HealPostBundleFeedbackV1 => { self.dispatch_network_payload_x_HealPostBundleFeedbackV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
PayloadRequestKind::CachePostBundleV1 => self.dispatch_network_payload_x_CachePostBundleV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
PayloadRequestKind::CachePostBundleFeedbackV1 => { self.dispatch_network_payload_x_CachePostBundleFeedbackV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await? }
PayloadRequestKind::FetchUrlPreviewV1 => self.dispatch_network_payload_x_FetchUrlPreviewV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
PayloadRequestKind::TrendingHashtagsFetchV1 => self.dispatch_network_payload_x_TrendingHashtagsFetchV1(cancellation_token, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
PayloadRequestKind::PeerStatsRequestV1 => self.dispatch_network_payload_x_PeerStatsRequestV1(cancellation_token, pow, rpc_request_packet_rx.payload_request_kind, rpc_request_packet_rx.bytes).await?,
};
Ok((compress_response, payload_response_kind, payload))
}
#[allow(non_snake_case)]
async fn dispatch_network_payload_x_PingV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, _bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
anyhow_assert_eq!(&PayloadRequestKind::PingV1, &payload_request_kind);
let peer = self.peer_self.read().clone();
let json = json::struct_to_bytes(&PingResponseV1 { peer })?;
Ok((PayloadResponseKind::PingResponseV1, BytesGatherer::from_bytes(json)))
}
#[allow(non_snake_case)]
async fn dispatch_network_payload_x_BootstrapV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, _bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
anyhow_assert_eq!(&PayloadRequestKind::BootstrapV1, &payload_request_kind);
let peers_random = self.kademlia.read().get_peers_random(config::BOOTSTRAP_V1_NUM_PEERS);
let json = json::struct_to_bytes(&BootstrapResponseV1 { peers_random })?;
Ok((PayloadResponseKind::BootstrapResponseV1, BytesGatherer::from_bytes(json)))
}
#[allow(non_snake_case)]
async fn dispatch_network_payload_x_AnnounceV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
anyhow_assert_eq!(&PayloadRequestKind::AnnounceV1, &payload_request_kind);
let request = json::bytes_to_struct::<AnnounceV1>(&bytes)?;
let peer = request.peer_self;
let peer_id = peer.id;
self.add_potential_peer_to_kademlia(peer, self.runtime_services.time_provider.as_ref().current_time_millis()).await;
let (peers_nearest, _) = self.kademlia.read().get_peers_for_key(&peer_id, config::ANNOUNCE_V1_NUM_PEERS);
let json = json::struct_to_bytes(&AnnounceResponseV1 {
peer_self: self.peer_self.read().clone(),
peers_nearest,
})?;
Ok((PayloadResponseKind::AnnounceResponseV1, BytesGatherer::from_bytes(json)))
}
#[allow(non_snake_case)]
async fn dispatch_network_payload_x_GetPostBundleV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
anyhow_assert_eq!(&PayloadRequestKind::GetPostBundleV1, &payload_request_kind);
let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
let request = json::bytes_to_struct::<GetPostBundleV1>(&bytes)?;
trace!("received GetPostBundleV1: bucket_location={}", request.bucket_location);
request.bucket_location.validate()?;
{
for peer in request.peers_visited {
self.add_potential_peer_to_kademlia(peer, time_millis).await;
}
}
let peer_self = self.peer_self.read().clone();
let (peers_nearer, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&request.bucket_location.location_id, config::REDUNDANT_SERVERS_PER_POST);
if !among_peers_nearer {
warn!("I am not in peers_nearer {}", peer_self);
}
let post_bundle = match among_peers_nearer {
true => {
let _post_bundle_lock = self.environment.get_write_lock_for_location_id(&request.bucket_location.location_id);
let mut encoded_post_bundle_bytes: Option<Bytes> = None;
let post_bundle_metadata = self.environment.get_post_bundle_metadata(time_millis, &request.bucket_location.location_id)?;
if let Some(mut post_bundle_metadata) = post_bundle_metadata {
encoded_post_bundle_bytes = self.environment.get_post_bundle_bytes(time_millis, &request.bucket_location.location_id)?;
if !post_bundle_metadata.sealed {
let sealed = time_millis > request.bucket_location.bucket_time_millis + request.bucket_location.duration + config::ENCODED_POST_BUNDLE_V1_ELAPSED_THRESHOLD_MILLIS;
if sealed {
if let Some(encoded_post_bundle_bytes_old) = encoded_post_bundle_bytes {
let mut encoded_post_bundle = EncodedPostBundleV1::from_bytes(encoded_post_bundle_bytes_old, true)?;
encoded_post_bundle.header.time_millis = time_millis;
encoded_post_bundle.header.sealed = true;
encoded_post_bundle.header.signature_generate(&self.server_id.keys.signature_key)?;
let encoded_post_bundle_bytes_new = encoded_post_bundle.to_bytes()?;
self.environment.put_post_bundle_bytes(time_millis, &request.bucket_location.location_id, &encoded_post_bundle_bytes_new)?;
encoded_post_bundle_bytes = Some(encoded_post_bundle_bytes_new);
}
post_bundle_metadata.sealed = true;
self.environment.put_post_bundle_metadata(time_millis, &request.bucket_location.location_id, &post_bundle_metadata, 0)?;
}
else {
if let Some(encoded_post_bundle_bytes_old) = encoded_post_bundle_bytes {
let mut encoded_post_bundle = EncodedPostBundleV1::from_bytes(encoded_post_bundle_bytes_old, true)?;
encoded_post_bundle.header.time_millis = time_millis;
encoded_post_bundle.header.signature_generate(&self.server_id.keys.signature_key)?;
let encoded_post_bundle_bytes_new = encoded_post_bundle.to_bytes()?;
encoded_post_bundle_bytes = Some(encoded_post_bundle_bytes_new);
}
}
}
};
if encoded_post_bundle_bytes.is_none() {
let sealed = time_millis > request.bucket_location.bucket_time_millis + request.bucket_location.duration + config::ENCODED_POST_BUNDLE_V1_ELAPSED_THRESHOLD_MILLIS;
let mut header = EncodedPostBundleHeaderV1 {
time_millis,
location_id: request.bucket_location.location_id,
overflowed: false,
sealed,
num_posts: 0,
encoded_post_ids: vec![],
encoded_post_lengths: vec![],
encoded_post_healed: HashSet::new(),
peer: peer_self.clone(),
signature: Signature::zero(),
};
header.signature_generate(&self.server_id.keys.signature_key)?;
let encoded_post_bundle = EncodedPostBundleV1 { header, encoded_posts_bytes: Bytes::new() };
encoded_post_bundle_bytes = Some(encoded_post_bundle.to_bytes()?);
}
encoded_post_bundle_bytes
}
false => None,
};
let cache_result = self.post_bundle_cache.on_get(&request.bucket_location, &request.already_retrieved_peer_ids, &peer_self, &self.server_id, time_millis);
let get_post_bundle_response = GetPostBundleResponseV1 {
peers_nearer,
cache_request_token: cache_result.cache_request_token,
post_bundles_cached: cache_result.cached_items,
post_bundle,
};
Ok((PayloadResponseKind::GetPostBundleResponseV1, get_post_bundle_response.to_bytes_gatherer()?))
}
#[allow(non_snake_case)]
async fn dispatch_network_payload_x_GetPostBundleFeedbackV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
anyhow_assert_eq!(&PayloadRequestKind::GetPostBundleFeedbackV1, &payload_request_kind);
let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
let request = json::bytes_to_struct::<GetPostBundleFeedbackV1>(&bytes)?;
trace!("received GetPostBundleFeedbackV1");
{
for peer in request.peers_visited {
self.add_potential_peer_to_kademlia(peer, time_millis).await;
}
}
let (peers_nearer, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&request.bucket_location.location_id, config::REDUNDANT_SERVERS_PER_POST);
let mut post_bundle_encoded_feedbacks_bytes: Option<Bytes> = None;
if among_peers_nearer {
let post_bundle_metadata = self.environment.get_post_bundle_metadata(time_millis, &request.bucket_location.location_id)?;
if post_bundle_metadata.is_some() {
post_bundle_encoded_feedbacks_bytes = Some(self.environment.get_post_bundle_encoded_post_feedbacks_bytes(time_millis, &request.bucket_location.location_id)?);
}
}
let peer_self = self.peer_self.read().clone();
let encoded_post_bundle_feedback = match post_bundle_encoded_feedbacks_bytes {
Some(feedbacks_bytes) => {
let feedbacks_bytes_hash = hashing::hash(feedbacks_bytes.as_ref());
let mut header = EncodedPostBundleFeedbackHeaderV1 {
time_millis,
location_id: request.bucket_location.location_id,
feedbacks_bytes_hash,
peer: peer_self.clone(),
signature: Signature::zero(),
};
header.signature_generate(&self.server_id.keys.signature_key);
let encoded_post_bundle_feedback = EncodedPostBundleFeedbackV1 {
header,
feedbacks_bytes,
};
Some(encoded_post_bundle_feedback.to_bytes()?)
}
None => None,
};
let cache_result = self.post_bundle_feedback_cache.on_get(&request.bucket_location, &request.already_retrieved_peer_ids, &peer_self, &self.server_id, time_millis);
let get_post_bundle_feedback_response = GetPostBundleFeedbackResponseV1 {
peers_nearer,
cache_request_token: cache_result.cache_request_token,
post_bundle_feedbacks_cached: cache_result.cached_items,
encoded_post_bundle_feedback,
};
Ok((PayloadResponseKind::GetPostBundleFeedbackResponseV1, get_post_bundle_feedback_response.to_bytes_gatherer()?))
}
#[allow(non_snake_case)]
async fn dispatch_network_payload_x_SubmitPostClaimV1(&self, _cancellation_token: CancellationToken, pow: Option<PeerPow>, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
anyhow_assert_eq!(&PayloadRequestKind::SubmitPostClaimV1, &payload_request_kind);
let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
let pow = match pow {
Some(pow) => pow,
None => anyhow::bail!("We need pow for a submit post claim"),
};
let request = SubmitPostClaimV1::from_bytes(&mut bytes)?;
trace!("received SubmitPostClaimV1");
request.bucket_location.validate()?;
let bucket_duration = {
let bucket_duration = BUCKET_DURATIONS.iter().find(|bucket_duration| **bucket_duration == request.bucket_location.duration);
match bucket_duration {
Some(bucket_duration) => *bucket_duration,
None => anyhow::bail!("Unrecognised bucket duration provided"),
}
};
let decoded_post = EncodedPostV1::decode_from_bytes(request.encoded_post_bytes, &request.bucket_location.base_id, false, false)?;
{
let pow_minimum = get_minimum_post_pow(decoded_post.header.post_length, decoded_post.header.linked_base_ids.len(), request.bucket_location.duration);
if pow.pow < pow_minimum {
anyhow::bail!("Insufficient proof of work for this post: actual={} < expected={}", pow.pow, pow_minimum);
}
}
{
let timestamp = BucketLocation::round_down_to_bucket_start(decoded_post.header.time_millis, bucket_duration);
if timestamp != request.bucket_location.bucket_time_millis {
anyhow::bail!("The post timestamp does not match the bucket");
}
}
let client_id = decoded_post.header.client_id()?;
if !decoded_post.header.linked_base_ids.contains(&request.bucket_location.base_id) {
anyhow::bail!("The base_id is not related to the post");
}
if request.bucket_location.bucket_type == BucketType::User && request.bucket_location.base_id != client_id.id {
anyhow::bail!("Only the posting user is allowed to post to a bucket of type USER");
}
if matches!(request.bucket_location.bucket_type, BucketType::ReplyToPost | BucketType::Sequel) {
let original_header_bytes = request.referenced_post_header_bytes
.ok_or_else(|| anyhow::anyhow!("{:?} posts require the original post's header bytes", request.bucket_location.bucket_type))?;
let original_post = EncodedPostV1::decode_from_bytes(original_header_bytes, &client_id.id, false, false)?;
if original_post.post_id != request.bucket_location.base_id {
anyhow::bail!("Referenced post header's post_id does not match the bucket's base_id");
}
if request.bucket_location.bucket_type == BucketType::Sequel {
let original_client_id = original_post.header.client_id()?;
if original_client_id != client_id {
anyhow::bail!("Sequel post author does not match original post author");
}
}
}
{
let delta = (time_millis - decoded_post.header.time_millis).abs();
if delta > config::CLIENT_POST_TIMESTAMP_DELTA_THRESHOLD {
anyhow::bail!("The post timestamp delta is too large ({} > {})", delta, config::CLIENT_POST_TIMESTAMP_DELTA_THRESHOLD);
}
}
let (peers_nearer, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&request.bucket_location.location_id, config::REDUNDANT_SERVERS_PER_POST);
let submit_post_claim_token = match among_peers_nearer {
true => {
let _post_bundle_lock = self.environment.get_write_lock_for_location_id(&request.bucket_location.location_id);
let post_bundle_metadata = self.environment.get_post_bundle_metadata(time_millis, &request.bucket_location.location_id)?;
let mut post_bundle_metadata = post_bundle_metadata.unwrap_or_else(PostBundleMetadata::zero);
if !post_bundle_metadata.sealed {
post_bundle_metadata.num_posts_granted += 1;
post_bundle_metadata.overflowed = post_bundle_metadata.num_posts > config::ENCODED_POST_BUNDLE_V1_OVERFLOWED_NUM_POSTS || post_bundle_metadata.num_posts_granted > config::ENCODED_POST_BUNDLE_V1_OVERFLOWED_NUM_POSTS_GRANTED;
post_bundle_metadata.sealed = post_bundle_metadata.overflowed || time_millis > request.bucket_location.bucket_time_millis + request.bucket_location.duration + config::ENCODED_POST_BUNDLE_V1_ELAPSED_THRESHOLD_MILLIS;
self.environment.put_post_bundle_metadata(time_millis, &request.bucket_location.location_id, &post_bundle_metadata, 0)?;
}
match post_bundle_metadata.sealed {
false => {
info!("Granted for {}: num_posts={} num_posts_granted={}", request.bucket_location, post_bundle_metadata.num_posts, post_bundle_metadata.num_posts_granted);
Some(SubmitPostClaimTokenV1::new(self.peer_self.read().clone(), request.bucket_location.clone(), decoded_post.post_id, &self.server_id.keys.signature_key))
}
true => {
info!(
"Not granting SubmitPostClaimTokenV1 to {} as we have num_posts={} num_posts_granted={}",
request.bucket_location, post_bundle_metadata.num_posts, post_bundle_metadata.num_posts_granted
);
None
}
}
}
false => None,
};
if submit_post_claim_token.is_some() {
if request.bucket_location.bucket_type == BucketType::User && !request.referenced_hashtags.is_empty() {
let author_verification_key_bytes = &decoded_post.header.verification_key_bytes;
for referenced_hashtag in &request.referenced_hashtags {
let hashtag_id = match Id::from_hashtag_str(referenced_hashtag) {
Ok(id) => id,
Err(_) => continue, };
if !decoded_post.header.linked_base_ids.contains(&hashtag_id) {
continue; }
let mut hll = self.trending_hashtags.get(referenced_hashtag).unwrap_or_else(HyperLogLog::new);
hll.insert(author_verification_key_bytes.as_ref());
self.trending_hashtags.insert(referenced_hashtag.clone(), hll);
}
}
}
let response = SubmitPostClaimResponseV1 { peers_nearer, submit_post_claim_token };
Ok((PayloadResponseKind::SubmitPostClaimResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
}
#[allow(non_snake_case)]
async fn dispatch_network_payload_x_SubmitPostCommitV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
anyhow_assert_eq!(&PayloadRequestKind::SubmitPostCommitV1, &payload_request_kind);
let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
let request = SubmitPostCommitV1::from_bytes(&mut bytes)?;
trace!("received SubmitPostCommitV1");
let peer_self = self.peer_self.read();
if request.submit_post_claim_token.peer.id != peer_self.id {
anyhow::bail!("The submit_post_claim_token is not from us");
}
request.bucket_location.validate()?;
if request.bucket_location != request.submit_post_claim_token.bucket_location {
anyhow::bail!("The location_id in the SubmitPostCommit does not match the bucket_location in the SubmitPostClaimToken");
}
let decoded_post = EncodedPostV1::decode_from_bytes(request.encoded_post_bytes.clone(), &request.bucket_location.base_id, true, false)?;
if decoded_post.post_id != request.submit_post_claim_token.post_id {
anyhow::bail!("The post_id of the committed post does not match the post_id in the SubmitPostClaimToken");
}
let _post_bundle_lock = self.environment.get_write_lock_for_location_id(&request.bucket_location.location_id);
let post_bundle_metadata = self.environment.get_post_bundle_metadata(time_millis, &request.bucket_location.location_id)?;
let post_bundle_bytes = self.environment.get_post_bundle_bytes(time_millis, &request.bucket_location.location_id)?;
let mut post_bundle_metadata = post_bundle_metadata.unwrap_or_else(PostBundleMetadata::zero);
let mut post_bundle = match post_bundle_bytes {
Some(bytes) => {
let bytes = Bytes::from_owner(bytes);
let bundle = EncodedPostBundleV1::from_bytes(bytes, true)?;
bundle
}
None => {
let header = EncodedPostBundleHeaderV1 {
time_millis: TimeMillis::zero(),
location_id: request.bucket_location.location_id,
overflowed: false,
sealed: false,
num_posts: 0,
encoded_post_ids: vec![],
encoded_post_lengths: vec![],
encoded_post_healed: HashSet::new(),
peer: self.peer_self.read().clone(),
signature: Signature::zero(),
};
EncodedPostBundleV1 { header, encoded_posts_bytes: Bytes::new() }
}
};
if post_bundle.header.encoded_post_ids.contains(&decoded_post.post_id) {
anyhow::bail!("Post {} is already in the bundle", decoded_post.post_id);
}
post_bundle.header.time_millis = time_millis;
post_bundle.header.num_posts += 1;
post_bundle.header.overflowed = post_bundle.header.num_posts > config::ENCODED_POST_BUNDLE_V1_OVERFLOWED_NUM_POSTS || post_bundle_metadata.num_posts_granted > config::ENCODED_POST_BUNDLE_V1_OVERFLOWED_NUM_POSTS_GRANTED;
post_bundle.header.sealed = post_bundle.header.overflowed || time_millis > request.bucket_location.bucket_time_millis + request.bucket_location.duration + config::ENCODED_POST_BUNDLE_V1_ELAPSED_THRESHOLD_MILLIS;
post_bundle.header.encoded_post_ids.push(decoded_post.post_id);
post_bundle.header.encoded_post_lengths.push(request.encoded_post_bytes.len());
post_bundle.header.signature_generate(&self.server_id.keys.signature_key)?;
let mut posts_mut = BytesMut::from(post_bundle.encoded_posts_bytes.as_ref());
posts_mut.extend_from_slice(request.encoded_post_bytes.as_ref());
post_bundle.encoded_posts_bytes = posts_mut.freeze();
let post_bundle_bytes_new = post_bundle.to_bytes()?;
post_bundle_metadata.num_posts = post_bundle.header.num_posts;
post_bundle_metadata.overflowed = post_bundle.header.overflowed;
post_bundle_metadata.sealed = post_bundle.header.sealed;
post_bundle_metadata.size = post_bundle.header.encoded_post_lengths.iter().sum();
{
self.environment.put_post_bundle_bytes(time_millis, &request.bucket_location.location_id, &post_bundle_bytes_new)?;
self.environment
.put_post_bundle_metadata(time_millis, &request.bucket_location.location_id, &post_bundle_metadata, request.encoded_post_bytes.len())?;
}
info!("Persisted for {}: num_posts={} num_posts_granted={}", request.bucket_location, post_bundle_metadata.num_posts, post_bundle_metadata.num_posts_granted);
let submit_post_commit_token = SubmitPostCommitTokenV1::new(peer_self.clone(), request.bucket_location, decoded_post.post_id, &self.server_id.keys.signature_key);
let response = SubmitPostCommitResponseV1 { submit_post_commit_token };
Ok((PayloadResponseKind::SubmitPostCommitResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
}
#[allow(non_snake_case)]
async fn dispatch_network_payload_x_SubmitPostFeedbackV1(&self, _cancellation_token: CancellationToken, pow: Option<PeerPow>, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
anyhow_assert_eq!(&PayloadRequestKind::SubmitPostFeedbackV1, &payload_request_kind);
let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
let request = SubmitPostFeedbackV1::from_bytes(&mut bytes)?;
trace!("received SubmitPostFeedbackV1");
let pow = pow.ok_or_else(|| anyhow::anyhow!("pow required for SubmitPostFeedbackV1"))?;
if pow.pow < config::POW_MINIMUM_PER_FEEDBACK {
anyhow::bail!("Insufficient pow for feedback: {} < {}", pow.pow, config::POW_MINIMUM_PER_FEEDBACK);
}
request.encoded_post_feedback.pow_verify()?;
let location_id = request.bucket_location.location_id;
let (peers_nearer, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&location_id, config::REDUNDANT_SERVERS_PER_POST);
let accepted = (|| -> anyhow::Result<bool> {
if !among_peers_nearer {
return Ok(false);
}
let _post_bundle_lock = self.environment.get_read_lock_for_location_id(&location_id);
let Some(post_bundle_bytes) = self.environment.get_post_bundle_bytes(time_millis, &location_id)?
else {
return Ok(false);
};
let post_bundle = EncodedPostBundleV1::from_bytes(Bytes::from_owner(post_bundle_bytes), false)?;
if !post_bundle.header.encoded_post_ids.contains(&request.encoded_post_feedback.post_id) {
return Ok(false);
}
Ok(true)
})()?;
if accepted {
trace!("Accepted post feedback for location_id={} encoded_post_feedback={:?}", location_id, request.encoded_post_feedback);
self.environment.put_post_feedback_if_more_powerful(time_millis, &location_id, &request.encoded_post_feedback)?;
}
let response = SubmitPostFeedbackResponseV1 { peers_nearer, accepted };
Ok((PayloadResponseKind::SubmitPostFeedbackResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
}
#[allow(non_snake_case)]
async fn dispatch_network_payload_x_HealPostBundleClaimV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
anyhow_assert_eq!(&PayloadRequestKind::HealPostBundleClaimV1, &payload_request_kind);
fn generate_negatory_response() -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
let response = HealPostBundleClaimResponseV1 { needed_post_ids: vec![], token: None };
Ok((PayloadResponseKind::HealPostBundleClaimResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
}
let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
let request = HealPostBundleClaimV1::from_bytes(&mut bytes)?;
trace!("received HealPostBundleClaimV1");
request.bucket_location.validate()?;
if request.bucket_location.location_id != request.donor_header.location_id {
anyhow::bail!("HealPostBundleClaimV1: bucket_location.location_id does not match donor_header.location_id");
}
request.donor_header.verify()?;
let (_, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&request.donor_header.location_id, config::REDUNDANT_SERVERS_PER_POST);
if !among_peers_nearer {
return generate_negatory_response();
}
if self.heal_in_progress.contains_key(&request.donor_header.location_id) {
return generate_negatory_response();
}
let _lock = self.environment.get_read_lock_for_location_id(&request.donor_header.location_id);
let post_bundle_bytes = self.environment.get_post_bundle_bytes(time_millis, &request.donor_header.location_id)?;
let our_post_ids: HashSet<Id> = match post_bundle_bytes {
Some(bytes) => {
let bundle = EncodedPostBundleV1::from_bytes(Bytes::from_owner(bytes), false)?;
bundle.header.encoded_post_ids.into_iter().collect()
}
None => HashSet::new(),
};
let needed_post_ids: Vec<Id> = request.donor_header.encoded_post_ids.iter().filter(|id| !our_post_ids.contains(*id)).copied().collect();
if needed_post_ids.is_empty() {
return generate_negatory_response();
}
self.heal_in_progress.insert(request.donor_header.location_id, ());
let token = Some(HealPostBundleClaimTokenV1::new(
self.peer_self.read().clone(),
request.bucket_location,
needed_post_ids.clone(),
request.donor_header.signature,
&self.server_id.keys.signature_key,
));
let response = HealPostBundleClaimResponseV1 { needed_post_ids, token };
Ok((PayloadResponseKind::HealPostBundleClaimResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
}
#[allow(non_snake_case)]
async fn dispatch_network_payload_x_HealPostBundleCommitV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
anyhow_assert_eq!(&PayloadRequestKind::HealPostBundleCommitV1, &payload_request_kind);
let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
let request = HealPostBundleCommitV1::from_bytes(&mut bytes)?;
trace!("received HealPostBundleCommitV1");
let peer_self = self.peer_self.read().clone();
if request.token.peer.id != peer_self.id {
anyhow::bail!("HealPostBundleCommitV1: token was not issued by this server");
}
request.token.verify()?;
if request.donor_header.signature != request.token.donor_header_signature {
anyhow::bail!("HealPostBundleCommitV1: donor_header signature does not match token");
}
request.donor_header.verify()?;
if request.token.bucket_location.location_id != request.donor_header.location_id {
anyhow::bail!("HealPostBundleCommitV1: token location_id does not match donor_header");
}
let location_id = request.donor_header.location_id;
let mut remaining_bytes = request.encoded_posts_bytes.clone();
let mut posts_to_add: Vec<(Id, Bytes)> = Vec::new();
for post_id in &request.token.needed_post_ids {
let len = request
.donor_header
.encoded_post_ids
.iter()
.zip(request.donor_header.encoded_post_lengths.iter())
.find(|(id, _)| *id == post_id)
.map(|(_, len)| *len)
.ok_or_else(|| anyhow::anyhow!("needed_post_id {} not found in donor_header", post_id))?;
if remaining_bytes.len() < len {
anyhow::bail!("HealPostBundleCommitV1: not enough bytes for post {}", post_id);
}
let post_bytes = remaining_bytes.split_to(len);
posts_to_add.push((*post_id, post_bytes));
}
if !remaining_bytes.is_empty() {
anyhow::bail!("HealPostBundleCommitV1: {} excess bytes", remaining_bytes.len());
}
for (post_id, post_bytes) in &posts_to_add {
EncodedPostV1::decode_from_bytes(post_bytes.clone(), &request.token.bucket_location.base_id, true, true).map_err(|e| anyhow::anyhow!("HealPostBundleCommitV1: post {} failed decryption: {}", post_id, e))?;
}
let _lock = self.environment.get_write_lock_for_location_id(&location_id);
let post_bundle_bytes = self.environment.get_post_bundle_bytes(time_millis, &location_id)?;
let post_bundle_metadata = self.environment.get_post_bundle_metadata(time_millis, &location_id)?;
let mut post_bundle_metadata = post_bundle_metadata.unwrap_or_else(PostBundleMetadata::zero);
let mut post_bundle = match post_bundle_bytes {
Some(b) => EncodedPostBundleV1::from_bytes(Bytes::from_owner(b), true)?,
None => EncodedPostBundleV1 {
header: EncodedPostBundleHeaderV1 {
time_millis: TimeMillis::zero(),
location_id,
overflowed: request.donor_header.overflowed,
sealed: request.donor_header.sealed,
num_posts: 0,
encoded_post_ids: vec![],
encoded_post_lengths: vec![],
encoded_post_healed: HashSet::new(),
peer: peer_self.clone(),
signature: Signature::zero(),
},
encoded_posts_bytes: Bytes::new(),
},
};
let our_post_ids: HashSet<Id> = post_bundle.header.encoded_post_ids.iter().copied().collect();
let mut posts_mut = BytesMut::from(post_bundle.encoded_posts_bytes.as_ref());
let mut added_any = false;
for (post_id, post_bytes) in posts_to_add {
if !our_post_ids.contains(&post_id) {
let len = post_bytes.len();
posts_mut.extend_from_slice(&post_bytes);
post_bundle.header.encoded_post_ids.push(post_id);
post_bundle.header.encoded_post_lengths.push(len);
post_bundle.header.encoded_post_healed.insert(post_id);
added_any = true;
}
}
post_bundle.encoded_posts_bytes = posts_mut.freeze();
if added_any {
post_bundle.header.time_millis = time_millis;
post_bundle.header.num_posts = post_bundle.header.encoded_post_ids.len() as u8;
post_bundle.header.signature_generate(&self.server_id.keys.signature_key)?;
let new_bytes = post_bundle.to_bytes()?;
post_bundle_metadata.num_posts = post_bundle.header.num_posts;
post_bundle_metadata.size = post_bundle.header.encoded_post_lengths.iter().sum();
self.environment.put_post_bundle_bytes(time_millis, &location_id, &new_bytes)?;
self.environment.put_post_bundle_metadata(time_millis, &location_id, &post_bundle_metadata, 0)?;
info!("Healed {} post(s) for location_id={}", post_bundle.header.encoded_post_healed.len(), location_id);
}
self.heal_in_progress.invalidate(&location_id);
let response = HealPostBundleCommitResponseV1 { accepted: added_any };
Ok((PayloadResponseKind::HealPostBundleCommitResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
}
#[allow(non_snake_case)]
async fn dispatch_network_payload_x_HealPostBundleFeedbackV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
anyhow_assert_eq!(&PayloadRequestKind::HealPostBundleFeedbackV1, &payload_request_kind);
let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
let request = HealPostBundleFeedbackV1::from_bytes(&mut bytes)?;
trace!("received HealPostBundleFeedbackV1 for location_id={}", request.location_id);
let (_, among_peers_nearer) = self.kademlia.read().get_peers_for_key(&request.location_id, config::REDUNDANT_SERVERS_PER_POST);
if !among_peers_nearer {
let response = HealPostBundleFeedbackResponseV1 { accepted_count: 0 };
return Ok((PayloadResponseKind::HealPostBundleFeedbackResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)));
}
let _post_bundle_lock = self.environment.get_read_lock_for_location_id(&request.location_id);
let post_bundle_bytes = self.environment.get_post_bundle_bytes(time_millis, &request.location_id)?;
let Some(post_bundle_bytes) = post_bundle_bytes
else {
let response = HealPostBundleFeedbackResponseV1 { accepted_count: 0 };
return Ok((PayloadResponseKind::HealPostBundleFeedbackResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)));
};
let post_bundle = EncodedPostBundleV1::from_bytes(Bytes::from_owner(post_bundle_bytes), false)?;
let mut accepted_count: u32 = 0;
for feedback in &request.encoded_post_feedbacks {
if !post_bundle.header.encoded_post_ids.contains(&feedback.post_id) {
continue;
}
self.environment.put_post_feedback_if_more_powerful(time_millis, &request.location_id, feedback)?;
accepted_count += 1;
}
if accepted_count > 0 {
trace!("Accepted {} healed feedback(s) for location_id={}", accepted_count, request.location_id);
}
let response = HealPostBundleFeedbackResponseV1 { accepted_count };
Ok((PayloadResponseKind::HealPostBundleFeedbackResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
}
#[allow(non_snake_case)]
async fn dispatch_network_payload_x_CachePostBundleV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
anyhow_assert_eq!(&PayloadRequestKind::CachePostBundleV1, &payload_request_kind);
let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
let request = CachePostBundleV1::from_bytes(&mut bytes)?;
trace!("received CachePostBundleV1 for bucket_location={}", request.token.bucket_location);
let peer_self = self.peer_self.read().clone();
if request.token.peer.id != peer_self.id {
anyhow::bail!("CachePostBundleV1: token was not issued by this server");
}
request.token.verify()?;
if request.token.is_expired(time_millis) {
anyhow::bail!("CachePostBundleV1: token has expired");
}
let mut any_accepted = false;
for bundle_bytes in request.encoded_post_bundles {
let parse_result: anyhow::Result<()> = try {
let encoded_post_bundle = EncodedPostBundleV1::from_bytes(bundle_bytes.clone(), true)?;
encoded_post_bundle.verify(&request.token.bucket_location.base_id)?;
let originator_peer_id = encoded_post_bundle.header.peer.id;
let is_sealed = encoded_post_bundle.header.sealed;
if self.post_bundle_cache.on_upload(request.token.bucket_location.location_id, originator_peer_id, bundle_bytes, time_millis, is_sealed) {
any_accepted = true;
}
};
if let Err(e) = &parse_result {
warn!("CachePostBundleV1: failed to parse bundle: {}", e);
}
}
let response = CachePostBundleResponseV1 { accepted: any_accepted };
Ok((PayloadResponseKind::CachePostBundleResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
}
#[allow(non_snake_case)]
async fn dispatch_network_payload_x_CachePostBundleFeedbackV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
anyhow_assert_eq!(&PayloadRequestKind::CachePostBundleFeedbackV1, &payload_request_kind);
let time_millis = self.runtime_services.time_provider.as_ref().current_time_millis();
let request = CachePostBundleFeedbackV1::from_bytes(&mut bytes)?;
trace!("received CachePostBundleFeedbackV1 for bucket_location={}", request.token.bucket_location);
let peer_self = self.peer_self.read().clone();
if request.token.peer.id != peer_self.id {
anyhow::bail!("CachePostBundleFeedbackV1: token was not issued by this server");
}
request.token.verify()?;
if request.token.is_expired(time_millis) {
anyhow::bail!("CachePostBundleFeedbackV1: token has expired");
}
let result: anyhow::Result<bool> = try {
let encoded_post_bundle_feedback = EncodedPostBundleFeedbackV1::from_bytes(request.encoded_post_bundle_feedback_bytes.clone())?;
encoded_post_bundle_feedback.verify()?;
let originator_peer_id = encoded_post_bundle_feedback.header.peer.id;
self.post_bundle_feedback_cache
.on_upload(request.token.bucket_location.location_id, originator_peer_id, request.encoded_post_bundle_feedback_bytes, time_millis, false)
};
let accepted = result.unwrap_or_else(|e| {
warn!("CachePostBundleFeedbackV1: parse error: {}", e);
false
});
let response = CachePostBundleFeedbackResponseV1 { accepted };
Ok((PayloadResponseKind::CachePostBundleFeedbackResponseV1, BytesGatherer::from_bytes(json::struct_to_bytes(&response)?)))
}
#[allow(non_snake_case)]
async fn dispatch_network_payload_x_FetchUrlPreviewV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
anyhow_assert_eq!(&PayloadRequestKind::FetchUrlPreviewV1, &payload_request_kind);
let request = FetchUrlPreviewV1::from_bytes(&mut bytes)?;
trace!("received FetchUrlPreviewV1 for url={}", request.url);
if !request.url.starts_with("https://") {
anyhow::bail!("FetchUrlPreviewV1 SSRF: only https:// URLs are allowed");
}
let host_and_port = request.url["https://".len()..].split(&['/', '?', '#'][..]).next().unwrap_or("");
let host = if host_and_port.starts_with('[') {
host_and_port.trim_start_matches('[').split(']').next().unwrap_or("")
} else {
host_and_port.split(':').next().unwrap_or(host_and_port)
};
if host.is_empty() {
anyhow::bail!("FetchUrlPreviewV1 SSRF: could not extract host from URL");
}
if host.parse::<std::net::IpAddr>().is_ok() {
anyhow::bail!("FetchUrlPreviewV1 SSRF: bare IP addresses are not allowed");
}
let resolved_socket_addrs: Vec<std::net::SocketAddr> = tokio::net::lookup_host((host, 443u16))
.await
.map_err(|e| anyhow::anyhow!("FetchUrlPreviewV1 SSRF: DNS resolution failed for {}: {}", host, e))?
.collect();
if resolved_socket_addrs.is_empty() {
anyhow::bail!("FetchUrlPreviewV1 SSRF: DNS returned no addresses for {}", host);
}
for socket_addr in &resolved_socket_addrs {
let ip = socket_addr.ip();
if is_ssrf_protected_ip(ip) {
anyhow::bail!("FetchUrlPreviewV1 SSRF: {} resolved to protected address {}", host, ip);
}
}
let http_client = reqwest::Client::builder()
.connect_timeout(std::time::Duration::from_secs(1))
.timeout(std::time::Duration::from_secs(3))
.user_agent("hashiverse-preview/1.0")
.resolve_to_addrs(host, &resolved_socket_addrs)
.redirect(reqwest::redirect::Policy::none())
.no_proxy()
.build()?;
const URL_FETCH_MAX_BODY_BYTES: usize = 2 * 1024 * 1024;
let mut http_response = http_client.get(&request.url).send().await?;
if let Some(content_length) = http_response.content_length() {
if content_length > URL_FETCH_MAX_BODY_BYTES as u64 {
anyhow::bail!("FetchUrlPreviewV1: Content-Length {} exceeds {} byte limit", content_length, URL_FETCH_MAX_BODY_BYTES);
}
}
let mut body_bytes = BytesMut::new();
while let Some(chunk) = http_response.chunk().await? {
let remaining = URL_FETCH_MAX_BODY_BYTES - body_bytes.len();
body_bytes.extend_from_slice(&chunk[..chunk.len().min(remaining)]);
if body_bytes.len() >= URL_FETCH_MAX_BODY_BYTES {
break;
}
}
let html = String::from_utf8_lossy(&body_bytes).into_owned();
let preview_data = url_preview::extract_url_preview(&html);
let response = FetchUrlPreviewResponseV1 {
url: if preview_data.canonical_url.is_empty() { request.url } else { preview_data.canonical_url },
title: preview_data.title,
description: preview_data.description,
image_url: preview_data.image_url,
};
Ok((PayloadResponseKind::FetchUrlPreviewResponseV1, BytesGatherer::from_bytes(response.to_bytes()?)))
}
#[allow(non_snake_case)]
async fn dispatch_network_payload_x_TrendingHashtagsFetchV1(&self, _cancellation_token: CancellationToken, payload_request_kind: PayloadRequestKind, mut bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
anyhow_assert_eq!(&PayloadRequestKind::TrendingHashtagsFetchV1, &payload_request_kind);
let request = TrendingHashtagsFetchV1::from_bytes(&mut bytes)?;
trace!("received TrendingHashtagsFetchV1 with limit={}", request.limit);
let time_millis = self.runtime_services.time_provider.current_time_millis();
let cached_response = {
let cache = self.trending_hashtags_response_cache.lock();
match cache.as_ref() {
Some((cached_time, cached_response)) if (time_millis - *cached_time) < MILLIS_IN_SECOND.const_mul(30) => {
Some(cached_response.clone())
}
_ => None,
}
};
let mut response = match cached_response {
Some(mut cached) => {
cached.trending_hashtags.truncate(request.limit as usize);
cached
}
None => {
let mut trending_hashtags: Vec<TrendingHashtagV1> = self.trending_hashtags.iter()
.map(|(hashtag, hll)| TrendingHashtagV1 {
hashtag: hashtag.as_ref().clone(),
count: hll.count(),
})
.filter(|entry| entry.count > 0)
.collect();
trending_hashtags.sort_by(|a, b| b.count.cmp(&a.count));
let full_response = TrendingHashtagsFetchResponseV1 { trending_hashtags };
{
let mut cache = self.trending_hashtags_response_cache.lock();
*cache = Some((time_millis, full_response.clone()));
}
let mut truncated_response = full_response;
truncated_response.trending_hashtags.truncate(request.limit as usize);
truncated_response
}
};
top_up_trending_hashtags_with_fallback(&mut response.trending_hashtags, request.limit, TRENDING_HASHTAGS_FALLBACK);
Ok((PayloadResponseKind::TrendingHashtagsFetchResponseV1, BytesGatherer::from_bytes(response.to_bytes()?)))
}
#[allow(non_snake_case)]
async fn dispatch_network_payload_x_PeerStatsRequestV1(&self, _cancellation_token: CancellationToken, pow: Option<PeerPow>, payload_request_kind: PayloadRequestKind, bytes: Bytes) -> anyhow::Result<(PayloadResponseKind, BytesGatherer)> {
anyhow_assert_eq!(&PayloadRequestKind::PeerStatsRequestV1, &payload_request_kind);
let _request = PeerStatsRequestV1::from_bytes(&bytes)?;
let pow = pow.ok_or_else(|| anyhow::anyhow!("pow required for PeerStatsRequestV1"))?;
if pow.pow < config::POW_MINIMUM_PER_PEER_STATS {
anyhow::bail!("Insufficient pow for PeerStatsRequestV1: {} < {}", pow.pow, config::POW_MINIMUM_PER_PEER_STATS);
}
let time_millis = self.runtime_services.time_provider.current_time_millis();
let cached_response = {
let cache = self.peer_stats_response_cache.lock();
match cache.as_ref() {
Some((cached_time, cached_response)) if (time_millis - *cached_time) < MILLIS_IN_SECOND.const_mul(60) => Some(cached_response.clone()),
_ => None,
}
};
let response = match cached_response {
Some(cached) => cached,
None => {
let doc = serde_json::json!({
"version": env!("CARGO_PKG_VERSION"),
"requests": request_counts_subtree(&self.request_counters),
"system": system_stats_subtree(),
"kademlia": kademlia_stats_subtree(&self.kademlia.read()),
"environment": environment_stats_subtree(&self.environment),
});
let json_bytes = serde_json::to_vec(&doc)?;
let json_compressed = compression::compress_for_speed(&json_bytes)?.to_bytes();
let signing_input = PeerStatsResponseV1::signing_input(time_millis, &json_compressed);
let signature = signing::sign(&self.server_id.keys.signature_key, &signing_input);
let response = PeerStatsResponseV1 {
peer: self.peer_self.read().clone(),
timestamp: time_millis,
json_compressed,
signature,
};
*self.peer_stats_response_cache.lock() = Some((time_millis, response.clone()));
response
}
};
Ok((PayloadResponseKind::PeerStatsResponseV1, BytesGatherer::from_bytes(response.to_bytes()?)))
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_trending_hashtag(hashtag: &str, count: u64) -> TrendingHashtagV1 {
TrendingHashtagV1 { hashtag: hashtag.to_string(), count }
}
#[test]
fn top_up_adds_fallback_when_empty() {
let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 5, &["#hashiverse", "#news"]);
assert_eq!(trending_hashtags.len(), 2);
assert_eq!(trending_hashtags[0].hashtag, "#hashiverse");
assert_eq!(trending_hashtags[0].count, 0);
assert_eq!(trending_hashtags[1].hashtag, "#news");
assert_eq!(trending_hashtags[1].count, 0);
}
#[test]
fn top_up_respects_limit_smaller_than_fallback_list() {
let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 1, &["#hashiverse", "#news"]);
assert_eq!(trending_hashtags.len(), 1);
assert_eq!(trending_hashtags[0].hashtag, "#hashiverse");
}
#[test]
fn top_up_preserves_fallback_order() {
let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 3, &["#first", "#second", "#third"]);
assert_eq!(trending_hashtags[0].hashtag, "#first");
assert_eq!(trending_hashtags[1].hashtag, "#second");
assert_eq!(trending_hashtags[2].hashtag, "#third");
}
#[test]
fn top_up_is_noop_when_already_at_limit() {
let mut trending_hashtags = vec![make_trending_hashtag("#rust", 10), make_trending_hashtag("#golang", 5)];
top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 2, &["#hashiverse", "#news"]);
assert_eq!(trending_hashtags.len(), 2);
assert_eq!(trending_hashtags[0].hashtag, "#rust");
assert_eq!(trending_hashtags[1].hashtag, "#golang");
}
#[test]
fn top_up_partially_fills_when_real_trending_exists() {
let mut trending_hashtags = vec![make_trending_hashtag("#rust", 10)];
top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 3, &["#hashiverse", "#news"]);
assert_eq!(trending_hashtags.len(), 3);
assert_eq!(trending_hashtags[0].hashtag, "#rust");
assert_eq!(trending_hashtags[0].count, 10);
assert_eq!(trending_hashtags[1].hashtag, "#hashiverse");
assert_eq!(trending_hashtags[1].count, 0);
assert_eq!(trending_hashtags[2].hashtag, "#news");
assert_eq!(trending_hashtags[2].count, 0);
}
#[test]
fn top_up_skips_fallback_already_present_exact_match() {
let mut trending_hashtags = vec![make_trending_hashtag("#hashiverse", 42)];
top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 3, &["#hashiverse", "#news"]);
assert_eq!(trending_hashtags.len(), 2);
assert_eq!(trending_hashtags[0].hashtag, "#hashiverse");
assert_eq!(trending_hashtags[0].count, 42, "real trending entry must not be overwritten by filler");
assert_eq!(trending_hashtags[1].hashtag, "#news");
assert_eq!(trending_hashtags[1].count, 0);
}
#[test]
fn top_up_dedup_is_case_insensitive_and_prefix_agnostic() {
let mut trending_hashtags = vec![make_trending_hashtag("HashiVerse", 7)];
top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 3, &["#hashiverse", "#news"]);
assert_eq!(trending_hashtags.len(), 2);
assert_eq!(trending_hashtags[0].hashtag, "HashiVerse");
assert_eq!(trending_hashtags[1].hashtag, "#news");
}
#[test]
fn top_up_with_empty_fallback_is_noop() {
let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 5, &[]);
assert_eq!(trending_hashtags.len(), 0);
}
#[test]
fn top_up_handles_zero_limit() {
let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 0, &["#hashiverse", "#news"]);
assert_eq!(trending_hashtags.len(), 0);
}
#[test]
fn top_up_exhausts_fallback_without_reaching_limit() {
let mut trending_hashtags: Vec<TrendingHashtagV1> = vec![];
top_up_trending_hashtags_with_fallback(&mut trending_hashtags, 10, &["#hashiverse", "#news"]);
assert_eq!(trending_hashtags.len(), 2, "should stop at the end of the fallback list, not pad further");
}
mod peer_stats {
use super::*;
use crate::environment::mem_environment_store::MemEnvironmentFactory;
use crate::environment::environment::EnvironmentFactory;
use crate::server::args::Args;
use crate::server::hashiverse_server::HashiverseServer;
use hashiverse_lib::protocol::payload::payload::{PAYLOAD_REQUEST_KIND_COUNT, PeerStatsRequestV1, PeerStatsResponseV1};
use hashiverse_lib::protocol::peer::PeerPow;
use hashiverse_lib::tools::compression;
use hashiverse_lib::tools::pow_generator::single_threaded_pow_generator::SingleThreadedPowGenerator;
use hashiverse_lib::tools::runtime_services::RuntimeServices;
use hashiverse_lib::tools::time::TimeMillis;
use hashiverse_lib::tools::time_provider::time_provider::RealTimeProvider;
use hashiverse_lib::tools::types::{Pow, VerificationKey};
use hashiverse_lib::transport::mem_transport::MemTransportFactory;
use std::sync::Arc;
use std::sync::atomic::Ordering;
async fn make_server() -> anyhow::Result<Arc<HashiverseServer>> {
let time_provider = Arc::new(RealTimeProvider::default());
let transport_factory = MemTransportFactory::default();
let pow_generator = Arc::new(SingleThreadedPowGenerator::new());
let runtime_services = Arc::new(RuntimeServices { time_provider, transport_factory, pow_generator });
let environment_factory = Arc::new(MemEnvironmentFactory::new(""));
let args = Args::default_for_testing();
HashiverseServer::new(runtime_services, environment_factory, args).await
}
fn synthetic_pow(pow: Pow) -> PeerPow {
let mut peer_pow = PeerPow::zero();
peer_pow.pow = pow;
peer_pow
}
fn empty_request_bytes() -> Bytes {
PeerStatsRequestV1 {}.to_bytes().expect("PeerStatsRequestV1 must serialise")
}
fn decode_doc(response: &PeerStatsResponseV1) -> serde_json::Value {
let bytes = compression::decompress(&response.json_compressed).expect("decompress doc").to_bytes();
serde_json::from_slice(&bytes).expect("doc must be valid JSON")
}
#[tokio::test]
async fn rejects_insufficient_pow() {
let server = make_server().await.expect("server must start");
let result = server
.dispatch_network_payload_x_PeerStatsRequestV1(
CancellationToken::new(),
Some(synthetic_pow(Pow(config::POW_MINIMUM_PER_PEER_STATS.0.saturating_sub(1)))),
PayloadRequestKind::PeerStatsRequestV1,
empty_request_bytes(),
)
.await;
assert!(result.is_err(), "expected insufficient PoW to be rejected");
}
#[tokio::test]
async fn returns_response_with_expected_top_level_keys() {
let server = make_server().await.expect("server must start");
let (response_kind, gatherer) = server
.dispatch_network_payload_x_PeerStatsRequestV1(
CancellationToken::new(),
Some(synthetic_pow(config::POW_MINIMUM_PER_PEER_STATS)),
PayloadRequestKind::PeerStatsRequestV1,
empty_request_bytes(),
)
.await
.expect("handler must succeed at threshold pow");
assert_eq!(response_kind, PayloadResponseKind::PeerStatsResponseV1);
let response_bytes = gatherer.to_bytes();
let response = PeerStatsResponseV1::from_bytes(&response_bytes).expect("response must decode");
let doc = decode_doc(&response);
assert!(doc.get("version").and_then(|v| v.as_str()).map(|s| !s.is_empty()).unwrap_or(false), "version must be a non-empty string");
assert_eq!(doc["version"].as_str().unwrap(), env!("CARGO_PKG_VERSION"));
assert!(doc.get("requests").is_some(), "requests subtree missing");
assert!(doc.get("system").is_some(), "system subtree missing");
assert!(doc.get("kademlia").is_some(), "kademlia subtree missing");
assert!(doc.get("environment").is_some(), "environment subtree missing");
for key in ["memory_total_bytes", "memory_free_bytes", "disk_total_bytes", "disk_free_bytes", "load_1m", "load_5m", "load_15m"] {
assert!(doc["system"].get(key).map(|v| v.is_number()).unwrap_or(false), "system.{key} must be a number");
}
for key in ["post_bundle_count", "post_bundle_feedback_count", "post_bundle_total_bytes"] {
assert!(doc["environment"].get(key).map(|v| v.is_number()).unwrap_or(false), "environment.{key} must be a number");
}
}
#[tokio::test]
async fn counters_reflect_recorded_dispatches() {
let server = make_server().await.expect("server must start");
for _ in 0..7 {
server.request_counters[PayloadRequestKind::PingV1 as usize].fetch_add(1, Ordering::Relaxed);
}
let (_, gatherer) = server
.dispatch_network_payload_x_PeerStatsRequestV1(
CancellationToken::new(),
Some(synthetic_pow(config::POW_MINIMUM_PER_PEER_STATS)),
PayloadRequestKind::PeerStatsRequestV1,
empty_request_bytes(),
)
.await
.expect("handler must succeed");
let response = PeerStatsResponseV1::from_bytes(&gatherer.to_bytes()).expect("response must decode");
let doc = decode_doc(&response);
assert_eq!(doc["requests"]["PingV1"].as_u64(), Some(7));
}
#[tokio::test]
async fn cache_returns_byte_identical_response_within_ttl() {
let server = make_server().await.expect("server must start");
let pow = synthetic_pow(config::POW_MINIMUM_PER_PEER_STATS);
let (_, gatherer_a) = server
.dispatch_network_payload_x_PeerStatsRequestV1(CancellationToken::new(), Some(pow.clone()), PayloadRequestKind::PeerStatsRequestV1, empty_request_bytes())
.await
.expect("first call must succeed");
let bytes_a = gatherer_a.to_bytes();
server.request_counters[PayloadRequestKind::PingV1 as usize].fetch_add(99, Ordering::Relaxed);
let (_, gatherer_b) = server
.dispatch_network_payload_x_PeerStatsRequestV1(CancellationToken::new(), Some(pow), PayloadRequestKind::PeerStatsRequestV1, empty_request_bytes())
.await
.expect("second call must succeed");
let bytes_b = gatherer_b.to_bytes();
assert_eq!(bytes_a, bytes_b, "cached response should be byte-identical across the TTL");
}
#[tokio::test]
async fn signature_verifies_and_fails_on_tamper() {
let server = make_server().await.expect("server must start");
let (_, gatherer) = server
.dispatch_network_payload_x_PeerStatsRequestV1(
CancellationToken::new(),
Some(synthetic_pow(config::POW_MINIMUM_PER_PEER_STATS)),
PayloadRequestKind::PeerStatsRequestV1,
empty_request_bytes(),
)
.await
.expect("handler must succeed");
let response = PeerStatsResponseV1::from_bytes(&gatherer.to_bytes()).expect("response must decode");
let verification_key = VerificationKey::from_bytes(&response.peer.verification_key_bytes).expect("verification key must decode");
let signing_input = PeerStatsResponseV1::signing_input(response.timestamp, &response.json_compressed);
signing::verify(&verification_key, &response.signature, &signing_input).expect("signature must verify against transmitted bytes");
let mut tampered = response.json_compressed.to_vec();
let tamper_index = tampered.len() / 2;
tampered[tamper_index] ^= 0xff;
let tampered_signing_input = PeerStatsResponseV1::signing_input(response.timestamp, &tampered);
assert!(signing::verify(&verification_key, &response.signature, &tampered_signing_input).is_err(), "verification must fail when json_compressed is tampered");
let bumped_signing_input = PeerStatsResponseV1::signing_input(TimeMillis(response.timestamp.0 + 1), &response.json_compressed);
assert!(signing::verify(&verification_key, &response.signature, &bumped_signing_input).is_err(), "verification must fail when timestamp is mutated");
}
#[test]
fn request_counts_subtree_covers_every_variant() {
let counters: [std::sync::atomic::AtomicU64; PAYLOAD_REQUEST_KIND_COUNT] = std::array::from_fn(|_| std::sync::atomic::AtomicU64::new(0));
let subtree = request_counts_subtree(&counters);
let map = subtree.as_object().expect("request_counts subtree must be an object");
assert_eq!(map.len(), PAYLOAD_REQUEST_KIND_COUNT);
}
}
}