use crate::anyhow_assert_eq;
use crate::client::key_locker::key_locker::KeyLocker;
use crate::client::peer_tracker::peer_tracker::PeerTracker;
use crate::client::post_bundle::live_post_bundle_manager::LivePostBundleManager;
use crate::client::post_bundle::post_bundle_manager::PostBundleManager;
use crate::client::timeline::recent_posts_pen::RecentPostsPen;
use crate::protocol::posting::encoded_post::{EncodedPostBytesV1, EncodedPostV1};
use crate::protocol::payload::payload::{PayloadRequestKind, PayloadResponseKind, SubmitPostClaimResponseV1, SubmitPostClaimV1, SubmitPostCommitResponseV1, SubmitPostCommitTokenV1, SubmitPostCommitV1, SubmitPostFeedbackResponseV1, SubmitPostFeedbackV1};
use crate::protocol::rpc;
use crate::tools::buckets::{bucket_durations_for_type, generate_bucket_location, BucketLocation, BucketType};
use crate::tools::client_id::ClientId;
use crate::tools::time::TimeMillis;
use crate::tools::tools::spawn_background_task;
use crate::tools::{config, json};
use crate::tools::runtime_services::RuntimeServices;
use bytes::Bytes;
use futures::channel::mpsc;
use futures::StreamExt;
use log::{info, trace, warn};
use scraper::{Html, Selector};
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::RwLock;
use crate::protocol::posting::amplification;
use crate::protocol::posting::encoded_post_feedback::EncodedPostFeedbackV1;
use crate::tools::types::Id;
struct LinkedBaseIdDetail {
linked_base_id: Id,
bucket_type: BucketType,
referenced_post_header_bytes: Option<Bytes>,
}
struct SubmissionOutcome {
bucket_type: BucketType,
token: SubmitPostCommitTokenV1,
}
#[allow(clippy::too_many_arguments)] pub async fn submit_post(
runtime_services: Arc<RuntimeServices>,
client_id: &ClientId,
key_locker: &Arc<dyn KeyLocker>,
post_bundle_manager: Arc<LivePostBundleManager>,
peer_tracker: Arc<RwLock<PeerTracker>>,
recent_posts_pen: &Arc<RwLock<RecentPostsPen>>,
post: &str,
wait_for_all_submissions: bool,
) -> anyhow::Result<(Vec<SubmitPostCommitTokenV1>, (EncodedPostV1, Bytes))> {
trace!("submitting post: {}", post);
if post.is_empty() {
anyhow::bail!("Post cannot be empty");
}
let timestamp = runtime_services.time_provider.current_time_millis();
let (linked_base_id_details, referenced_hashtags) = build_locations(client_id, post)?;
let linked_base_ids: Vec<Id> = linked_base_id_details.iter().map(|d| d.linked_base_id).collect();
let mut encoded_post = EncodedPostV1::new(client_id, timestamp, linked_base_ids, post);
let encoded_post_bytes = encoded_post.encode_to_bytes_direct(key_locker).await?;
let encoded_post_bytes_raw = Bytes::copy_from_slice(encoded_post_bytes.bytes());
let (outcomes_tx, mut outcomes_rx) = mpsc::unbounded::<SubmissionOutcome>();
spawn_background_task(post_to_locations(
runtime_services,
client_id.clone(),
post_bundle_manager,
peer_tracker,
recent_posts_pen.clone(),
linked_base_id_details,
encoded_post.clone(),
encoded_post_bytes,
encoded_post_bytes_raw.clone(),
referenced_hashtags,
timestamp,
outcomes_tx,
));
let mut post_commit_tokens = Vec::new();
let mut user_committed = false;
while let Some(SubmissionOutcome { bucket_type, token }) = outcomes_rx.next().await {
if bucket_type == BucketType::User {
user_committed = true;
}
post_commit_tokens.push(token);
if !wait_for_all_submissions && user_committed {
return Ok((post_commit_tokens, (encoded_post, encoded_post_bytes_raw)));
}
}
if !user_committed {
anyhow::bail!("Failed to post to any User buckets, so bailing,");
}
Ok((post_commit_tokens, (encoded_post, encoded_post_bytes_raw)))
}
async fn record_in_pen(recent_posts_pen: &Arc<RwLock<RecentPostsPen>>, token: &SubmitPostCommitTokenV1, encoded_post_bytes_raw: &Bytes, timestamp: TimeMillis) {
recent_posts_pen.write().await.add_all(&[(token.bucket_location.clone(), token.post_id)], encoded_post_bytes_raw.clone(), timestamp);
}
fn build_locations(client_id: &ClientId, post: &str) -> anyhow::Result<(Vec<LinkedBaseIdDetail>, Vec<String>)> {
let mut linked_base_id_details: Vec<LinkedBaseIdDetail> = vec![];
let mut referenced_hashtags: Vec<String> = vec![];
linked_base_id_details.push(LinkedBaseIdDetail { linked_base_id: client_id.id, bucket_type: BucketType::User, referenced_post_header_bytes: None });
{
let html = Html::parse_fragment(post);
{
let is_quoted = |element: scraper::ElementRef| -> bool {
let mut node = element.parent();
while let Some(n) = node {
if let Some(el) = scraper::ElementRef::wrap(n) {
if matches!(el.value().name(), "reply" | "repost" | "sequel") { return true; }
}
node = n.parent();
}
false
};
let selector_hashtag = Selector::parse("hashtag").map_err(|e| anyhow::anyhow!("Failed to parse hashtag selector: {}", e))?;
let mut seen_hashtags: HashSet<String> = HashSet::new();
for element in html.select(&selector_hashtag) {
if is_quoted(element) { continue; }
if let Some(hashtag) = element.attr("hashtag") {
if !seen_hashtags.insert(hashtag.to_string()) { continue; }
trace!("hashtag={:?}", hashtag);
referenced_hashtags.push(hashtag.to_string());
linked_base_id_details.push(LinkedBaseIdDetail { linked_base_id: Id::from_hashtag_str(hashtag)?, bucket_type: BucketType::Hashtag, referenced_post_header_bytes: None });
} else {
warn!("hashtag attribute not found in element {:?}", element);
}
}
let selector_mention = Selector::parse("mention").map_err(|e| anyhow::anyhow!("Failed to parse mention selector: {}", e))?;
let mut seen_mentions: HashSet<Id> = HashSet::new();
for element in html.select(&selector_mention) {
if is_quoted(element) { continue; }
if let Some(client_id_str) = element.attr("client_id") {
match Id::from_hex_str(client_id_str) {
Ok(client_id) => {
if !seen_mentions.insert(client_id) { continue; }
trace!("mention_id={:?}", client_id);
linked_base_id_details.push(LinkedBaseIdDetail { linked_base_id: client_id, bucket_type: BucketType::Mention, referenced_post_header_bytes: None });
}
Err(e) => warn!("mention_id corrupted in element {:?}:, {}", element, e),
}
} else {
warn!("mention attribute not found in element {:?}", element);
}
}
let selector_reply = Selector::parse("reply").map_err(|e| anyhow::anyhow!("Failed to parse reply selector: {}", e))?;
let mut seen_replies: HashSet<Id> = HashSet::new();
for element in html.select(&selector_reply) {
if is_quoted(element) { continue; }
if let Some(post_id_str) = element.attr("post_id") {
match Id::from_hex_str(post_id_str) {
Ok(post_id) => {
if !seen_replies.insert(post_id) { continue; }
trace!("reply post_id={:?}", post_id);
let referenced_post_header_bytes = element.attr("post_header_hex")
.and_then(|hex_str| hex::decode(hex_str).ok())
.map(Bytes::from);
linked_base_id_details.push(LinkedBaseIdDetail { linked_base_id: post_id, bucket_type: BucketType::ReplyToPost, referenced_post_header_bytes });
}
Err(e) => warn!("reply post_id corrupted in element {:?}: {}", element, e),
}
} else {
warn!("post_id attribute not found in reply element {:?}", element);
}
}
let selector_sequel = Selector::parse("sequel").map_err(|e| anyhow::anyhow!("Failed to parse sequel selector: {}", e))?;
let mut seen_sequels: HashSet<Id> = HashSet::new();
for element in html.select(&selector_sequel) {
if is_quoted(element) { continue; }
if let Some(post_id_str) = element.attr("post_id") {
match Id::from_hex_str(post_id_str) {
Ok(post_id) => {
if !seen_sequels.insert(post_id) { continue; }
trace!("sequel post_id={:?}", post_id);
let referenced_post_header_bytes = element.attr("post_header_hex")
.and_then(|hex_str| hex::decode(hex_str).ok())
.map(Bytes::from);
linked_base_id_details.push(LinkedBaseIdDetail { linked_base_id: post_id, bucket_type: BucketType::Sequel, referenced_post_header_bytes });
}
Err(e) => warn!("sequel post_id corrupted in element {:?}: {}", element, e),
}
} else {
warn!("post_id attribute not found in sequel element {:?}", element);
}
}
}
}
Ok((linked_base_id_details, referenced_hashtags))
}
#[allow(clippy::too_many_arguments)] async fn post_to_locations(
runtime_services: Arc<RuntimeServices>,
client_id: ClientId,
post_bundle_manager: Arc<LivePostBundleManager>,
peer_tracker: Arc<RwLock<PeerTracker>>,
recent_posts_pen: Arc<RwLock<RecentPostsPen>>,
linked_base_id_details: Vec<LinkedBaseIdDetail>,
encoded_post: EncodedPostV1,
encoded_post_bytes: EncodedPostBytesV1,
encoded_post_bytes_raw: Bytes,
referenced_hashtags: Vec<String>,
timestamp: TimeMillis,
outcomes_tx: mpsc::UnboundedSender<SubmissionOutcome>,
) {
for linked_base_id_detail in &linked_base_id_details {
trace!("Posting to bucket type: {:?}, linked_base_id: {}", linked_base_id_detail.bucket_type, linked_base_id_detail.linked_base_id);
let mut committed_count = 0;
let try_result: anyhow::Result<()> = try {
for &bucket_duration in bucket_durations_for_type(linked_base_id_detail.bucket_type) {
let bucket_location = generate_bucket_location(linked_base_id_detail.bucket_type, linked_base_id_detail.linked_base_id, bucket_duration, timestamp)?;
info!("checking posting availability of {:?}", bucket_location);
let post_bundle = post_bundle_manager.get_post_bundle(&bucket_location, timestamp).await?;
if !post_bundle.header.overflowed && !post_bundle.header.sealed {
info!("Posting to {:?}", bucket_location);
let result = post_to_location(&runtime_services, &client_id.id, &peer_tracker, &recent_posts_pen, &bucket_location, &encoded_post, &encoded_post_bytes, &encoded_post_bytes_raw, linked_base_id_detail.referenced_post_header_bytes.as_deref(), &referenced_hashtags, linked_base_id_detail.bucket_type, timestamp, &outcomes_tx).await;
match result {
Ok(result) => {
committed_count += result.len();
break;
}
Err(e) => {
warn!("Failed to post to {:?}: {}", bucket_location, e);
continue;
}
}
}
else {
trace!("no availability: overflowed={} sealed={}", post_bundle.header.overflowed, post_bundle.header.sealed);
}
}
};
if let Err(e) = try_result {
warn!("Failed to post to any bucket location for linked_base_id {:?}: {}", linked_base_id_detail.linked_base_id, e);
}
if linked_base_id_detail.bucket_type == BucketType::User && committed_count == 0 {
return;
}
}
}
#[allow(clippy::too_many_arguments)] async fn post_to_location(
runtime_services: &RuntimeServices,
sponsor_id: &Id,
peer_tracker: &Arc<RwLock<PeerTracker>>,
recent_posts_pen: &Arc<RwLock<RecentPostsPen>>,
bucket_location: &BucketLocation,
encoded_post: &EncodedPostV1,
encoded_post_bytes: &EncodedPostBytesV1,
encoded_post_bytes_raw: &Bytes,
referenced_post_header_bytes: Option<&[u8]>,
referenced_hashtags: &[String],
bucket_type: BucketType,
timestamp: TimeMillis,
outcomes_tx: &mpsc::UnboundedSender<SubmissionOutcome>,
) -> anyhow::Result<Vec<SubmitPostCommitTokenV1>> {
let mut peers_visited = Vec::new();
let mut submit_post_claim_tokens = Vec::new();
let mut submit_post_commit_tokens = Vec::new();
let mut peer_tracker = peer_tracker.write().await;
let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, 2 * config::REDUNDANT_SERVERS_PER_POST, None).await?;
while let Some((peer, leading_agreement_bits)) = peer_iter.next_peer() {
let result: anyhow::Result<()> = try {
info!("Requesting SubmitPostClaim with leading_agreement_bits={} from peer {}", leading_agreement_bits, peer);
let request = SubmitPostClaimV1::new_to_bytes(bucket_location, referenced_post_header_bytes, referenced_hashtags, encoded_post_bytes.bytes_without_body())?;
let requisite_pow = amplification::get_minimum_post_pow(encoded_post.header.post_length, encoded_post.header.linked_base_ids.len(), bucket_location.duration);
let response = rpc::rpc::rpc_server_known_with_requisite_pow_and_no_compression(runtime_services, sponsor_id, &peer, PayloadRequestKind::SubmitPostClaimV1, request, requisite_pow).await?;
anyhow_assert_eq!(&PayloadResponseKind::SubmitPostClaimResponseV1, &response.response_request_kind);
let response = json::bytes_to_struct::<SubmitPostClaimResponseV1>(&response.bytes)?;
peers_visited.push(peer.clone());
peer_iter.add_peers(response.peers_nearer);
if let Some(submit_post_claim_token) = response.submit_post_claim_token {
info!("Received a SubmitPostClaim from peer {}", peer);
submit_post_claim_tokens.push(submit_post_claim_token.clone());
info!("Posting to peer {}", peer);
let request = SubmitPostCommitV1::new_to_bytes(bucket_location, &submit_post_claim_token, encoded_post_bytes.bytes())?;
let response = rpc::rpc::rpc_server_known_with_no_compression(runtime_services, sponsor_id, &peer, PayloadRequestKind::SubmitPostCommitV1, request).await?;
anyhow_assert_eq!(&PayloadResponseKind::SubmitPostCommitResponseV1, &response.response_request_kind);
let response = json::bytes_to_struct::<SubmitPostCommitResponseV1>(&response.bytes)?;
let submit_post_commit_token = response.submit_post_commit_token;
record_in_pen(recent_posts_pen, &submit_post_commit_token, encoded_post_bytes_raw, timestamp).await;
let _ = outcomes_tx.unbounded_send(SubmissionOutcome { bucket_type, token: submit_post_commit_token.clone() });
submit_post_commit_tokens.push(submit_post_commit_token);
if submit_post_commit_tokens.len() >= config::REDUNDANT_SERVERS_PER_POST {
break;
}
}
};
if let Err(e) = result {
warn!("Error retrieving SubmitPostClaim from peer {}: {}", peer, e);
peer_iter.remove_peer(&peer);
}
}
if submit_post_claim_tokens.is_empty() {
anyhow::bail!("Despite expecting availability, we were unable claim anywhere at {}", bucket_location);
}
if submit_post_commit_tokens.is_empty() {
anyhow::bail!("Despite expecting availability, we were unable commit anywhere at {}", bucket_location);
}
Ok(submit_post_commit_tokens)
}
pub async fn post_feedback_to_location(
runtime_services: &RuntimeServices,
sponsor_id: &Id,
peer_tracker: &Arc<RwLock<PeerTracker>>,
bucket_location: &BucketLocation,
encoded_post_feedback: &EncodedPostFeedbackV1,
) -> anyhow::Result<()> {
let mut peers_visited = Vec::new();
let mut submit_post_feedback_accepts = Vec::new();
let mut peer_tracker = peer_tracker.write().await;
let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, 2 * config::REDUNDANT_SERVERS_PER_POST, None).await?;
while let Some((peer, leading_agreement_bits)) = peer_iter.next_peer() {
let result: anyhow::Result<()> = try {
info!("Requesting SubmitPostFeedbackV1 with leading_agreement_bits={} from peer {}", leading_agreement_bits, peer);
let request = SubmitPostFeedbackV1::new_to_bytes(bucket_location, encoded_post_feedback)?;
let response = rpc::rpc::rpc_server_known_with_requisite_pow(runtime_services, sponsor_id, &peer, PayloadRequestKind::SubmitPostFeedbackV1, request, config::POW_MINIMUM_PER_FEEDBACK).await?;
anyhow_assert_eq!(&PayloadResponseKind::SubmitPostFeedbackResponseV1, &response.response_request_kind);
let response = json::bytes_to_struct::<SubmitPostFeedbackResponseV1>(&response.bytes)?;
peers_visited.push(peer.clone());
peer_iter.add_peers(response.peers_nearer);
if response.accepted {
info!("Received an accept from peer {}", peer);
submit_post_feedback_accepts.push(peer.clone());
if submit_post_feedback_accepts.len() >= config::REDUNDANT_SERVERS_PER_POST {
break;
}
}
};
if let Err(e) = result {
warn!("Error retrieving SubmitPostFeedbackV1 from peer {}: {}", peer, e);
peer_iter.remove_peer(&peer);
}
}
if submit_post_feedback_accepts.is_empty() {
anyhow::bail!("Despite expecting availability, we were unable commit anywhere at {}", bucket_location.location_id);
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tools::keys::Keys;
fn test_client_id() -> anyhow::Result<ClientId> {
let keys = Keys::from_rnd(false)?;
ClientId::new(keys.verification_key_bytes, keys.pq_commitment_bytes)
}
const ID_A: &str = "1111111111111111111111111111111111111111111111111111111111111111";
const ID_B: &str = "2222222222222222222222222222222222222222222222222222222222222222";
fn details_of(details: &[LinkedBaseIdDetail], bucket_type: BucketType) -> Vec<&LinkedBaseIdDetail> {
details.iter().filter(|d| d.bucket_type == bucket_type).collect()
}
#[tokio::test]
async fn plain_post_yields_only_self_user_bucket() -> anyhow::Result<()> {
let client_id = test_client_id()?;
let (details, hashtags) = build_locations(&client_id, "just some text with no references")?;
assert_eq!(details.len(), 1);
assert_eq!(details[0].bucket_type, BucketType::User);
assert_eq!(details[0].linked_base_id, client_id.id);
assert!(hashtags.is_empty());
Ok(())
}
#[tokio::test]
async fn repeated_hashtag_is_deduplicated() -> anyhow::Result<()> {
let client_id = test_client_id()?;
let (details, hashtags) = build_locations(&client_id, r#"<hashtag hashtag="rust"></hashtag> and again <hashtag hashtag="rust"></hashtag>"#)?;
assert_eq!(hashtags, vec!["rust".to_string()]);
assert_eq!(details_of(&details, BucketType::Hashtag).len(), 1);
Ok(())
}
#[tokio::test]
async fn repeated_mention_is_deduplicated() -> anyhow::Result<()> {
let client_id = test_client_id()?;
let post = format!(r#"<mention client_id="{ID_A}"></mention> hi again <mention client_id="{ID_A}"></mention>"#);
let (details, _) = build_locations(&client_id, &post)?;
assert_eq!(details_of(&details, BucketType::Mention).len(), 1);
Ok(())
}
#[tokio::test]
async fn distinct_mentions_are_kept() -> anyhow::Result<()> {
let client_id = test_client_id()?;
let post = format!(r#"<mention client_id="{ID_A}"></mention> <mention client_id="{ID_B}"></mention>"#);
let (details, _) = build_locations(&client_id, &post)?;
assert_eq!(details_of(&details, BucketType::Mention).len(), 2);
Ok(())
}
#[tokio::test]
async fn repeated_reply_keeps_first_header_bytes() -> anyhow::Result<()> {
let client_id = test_client_id()?;
let post = format!(r#"<reply post_id="{ID_A}" post_header_hex="aabb"></reply> <reply post_id="{ID_A}" post_header_hex="ccdd"></reply>"#);
let (details, _) = build_locations(&client_id, &post)?;
let replies = details_of(&details, BucketType::ReplyToPost);
assert_eq!(replies.len(), 1);
assert_eq!(replies[0].referenced_post_header_bytes.as_deref(), Some(&[0xaa, 0xbb][..]));
Ok(())
}
#[tokio::test]
async fn repeated_sequel_is_deduplicated() -> anyhow::Result<()> {
let client_id = test_client_id()?;
let post = format!(r#"<sequel post_id="{ID_A}" post_header_hex="aabb"></sequel> <sequel post_id="{ID_A}" post_header_hex="ccdd"></sequel>"#);
let (details, _) = build_locations(&client_id, &post)?;
let sequels = details_of(&details, BucketType::Sequel);
assert_eq!(sequels.len(), 1);
assert_eq!(sequels[0].referenced_post_header_bytes.as_deref(), Some(&[0xaa, 0xbb][..]));
Ok(())
}
#[tokio::test]
async fn same_id_as_mention_and_reply_stays_two_details() -> anyhow::Result<()> {
let client_id = test_client_id()?;
let post = format!(r#"<mention client_id="{ID_A}"></mention> <reply post_id="{ID_A}"></reply>"#);
let (details, _) = build_locations(&client_id, &post)?;
assert_eq!(details_of(&details, BucketType::Mention).len(), 1);
assert_eq!(details_of(&details, BucketType::ReplyToPost).len(), 1);
Ok(())
}
}